首页 > 解决方案 > Apache Airflow 不强制执行 dagrun_timeout

问题描述

我正在使用带有顺序执行器的 Apache Airflow 版本 1.10.3,如果 DAG 尚未完成,我希望 DAG 在一定时间后失败。我尝试dagrun_timeout在示例代码中设置

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 1),
    'retries': 0,
}

dag = DAG('min_timeout', default_args=default_args, schedule_interval=timedelta(minutes=5), dagrun_timeout = timedelta(seconds=30), max_active_runs=1)

t1 = BashOperator(
    task_id='fast_task',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='slow_task',
    bash_command='sleep 45',
    dag=dag)

t2.set_upstream(t1)

slow_task单独需要超过 设置的时间限制dagrun_timeout,所以我的理解是气流应该停止 DAG 执行。但是,这不会发生,slow_task 被允许在其整个持续时间内运行。发生这种情况后,运行会被标记为失败,但这不会根据需要终止任务或 DAG。使用execution_timeoutforslow_task确实会导致任务在指定的时间限制内被终止,但我更愿意为 DAG 使用总体时间限制,而不是execution_timeout为每个任务指定。

还有什么我应该尝试实现这种行为的,或者我可以修复的任何错误吗?

标签: airflow

解决方案


Airflow 调度程序至少每运行一个循环SCHEDULER_HEARTBEAT_SEC(默认为5 秒)。

请记住at least这里,因为调度程序执行的一些操作可能会延迟其循环的下一个周期。

这些行动包括:

  • 解析 dag
  • 填满 DagBag
  • 检查 DagRun 并更新它们的状态
  • 安排下一个 DagRun

在您的示例中,延迟任务不会在 终止,dagrun_timeout因为调度程序在任务完成后执行其下一个周期。


推荐阅读