首页 > 解决方案 > 气流调度程序不尊重 EndTime 与 datetime.now()+timedelta()

问题描述

我正在尝试安排一个 dag 每 x 秒运行一次。我将开始时间设置为过去的日期,catchup = False,结束时间设置为未来几秒钟。

尽管 dag 按预期开始,但它并没有结束,而是永远持续下去。

如果我使用像datetime(2019,9,26)这样的绝对结束时间,但不使用datetime.now()+timedelta(seconds=100) ,则 dag 结束

start_date = datetime(2019, 1, 1)
end_date = datetime.now()+timedelta(seconds=200)

default_args = {
    "owner": "airflow",
    "depends_on_past": True,
    "start_date": start_date,
    "end_date": end_date
}

dag = DAG("file_dag", catchup=False, default_args=default_args, schedule_interval=timedelta(seconds=20), max_active_runs=1)

我希望 dag 在运行 10 次或 11 次后停止执行,具体取决于它何时开始。但即使在 20 次运行后它仍会继续执行,并且似乎不会停止。

标签: airflow

解决方案


不能/不得使用 datetime.now()instart_dateend_date表达式


您观察到的行为非常明显:

  • 回想一下,dag 定义文件是在后台连续解析的。第 [6] 节在 Airflow 中限制 DAG中Airflow 变量的数量:鲜为人知的提示、技巧和最佳实践

    您的 DAG 文件每 X 秒解析一次

  • 在您的 dag-definition 文件的每个解析周期中,end_date都会在 current time 之后更新为200 seconds。由于 dag-definition-file(s) 的解析永远进行,end_date 不断变化,你得到一个永无止境的 dag


推荐阅读