首页 > 解决方案 > Airflow 2.0 中的 DAG 计划

问题描述

如何在 Airflow 2.0 中安排 DAG 使其不在假期运行?

问题 1 : 每个月的第 5 个工作日运行?问题 2 : 每月第 5 个工作日运行,如果第 5 天是假期,那么它应该在第二天不是假期的时候运行?

标签: airflow-schedulerairflow

解决方案


目前这无法完成(至少不是本机)。Airflow DAG 接受单个 cron 表达式或 timedelta。如果你不能用其中之一说出所需的调度逻辑,那么你就不能在 Airflow 中进行这种调度。好消息是 Airflow 有AIP-39 更丰富的 scheduler_interval来解决这个问题,并在未来的版本中提供更多的调度能力。

也就是说,您可以通过将 DAG 设置为运行schedule_interval="@daily"并放置BranchPythonOperator为 DAG 的第一个任务来解决此问题。在 Python 可调用文件中,您可以编写所需的调度逻辑,这意味着如果是每月的第 5 个工作日,您的函数将返回 True,否则将返回 False,并且您的工作流将相应地分支。对于 True 它将继续执行,对于 False 它将结束。这并不理想,但它会起作用。一个可能的模板可以是:

def check_fifth():
    #write the logic of finding if today is the 5th working day of the month

    if logic:
        return "continue_task"
    else:
        return "stop_task"

with DAG(dag_id='stackoverflow',
         default_args=default_args,
         schedule_interval="@daily",
         catchup=False
         ) as dag:

    start_op = BranchPythonOperator(
        task_id='choose',
        python_callable=check_fifth
    )

    stop_op = DummyOperator(
        task_id='stop_task'
    )

    #replace with the operator that you want to actually execute
    continue_op = YourOperator(
        task_id='continue_task'
    )

    start_op >> [stop_op, continue_op]

推荐阅读