首页 > 解决方案 > Airflow- 使用 TriggerDAGRunOperator 触发不同调度的 DAG

问题描述

我是气流新手。我需要为具有不同时间表的 DAG 依赖项提出一个干净简单的解决方案。

我有 DAG 1 每天运行,DAG 2 - 每周运行。如何使用 TriggerDAGRunOperator 从 Daily one 触发每周 DAG?

第 1 天:

with DAG('DAG 1',
     schedule_interval='0 10 * * *'
     ) as dag:

TASK1 = BashOperator(task_id='TASK1',
                bash_command='sample')
TRG_TASK=TriggerDAGRunOperator(task_id='TRG_TASK',trigger_dag_id='DAG 2')

TASK1 >> TRG_TASK

第 2 天:

with DAG('DAG 2',
     schedule_interval='15 10 * * 5'
     ) as dag:

TASK1 = BashOperator(task_id='TASK1',
                bash_command='sample')

我知道我可以使用 ExternalTask​​Sensor Operator 并提及 timedelta,但从长远来看它会变得混乱。

TriggerDAGRunOperator 是否有任何简单/干净的选项来每天检查 DAG 2 是否确实计划在当天运行,然后只触发它,否则在其他日子跳过它?

谢谢

标签: pythonairflow-schedulerairflow

解决方案


总是DAG2会被触发DAG1并且您只想DAG2每周运行一次,还是希望通过您列出触发DAG2的 cron 计划每周执行一次?DAG1

如果期望DAG2只是被触发,DAG1但您只想DAG2每周执行一次,您可以使用ShortCircuitOperatorinDAG1DAG2更新schedule_intervalof DAG2to None(这意味着 DAG 只会被触发 - 手动或以编程方式)。

  • 选项 1:在中,在执行 which 检查之前DAG1添加一个任务,以查看当前日期是否是所需的星期几。如果是,则继续触发。ShortCircuitOperatorTriggerDagRunOperatorDAG2

  • 选项 2:在中,在执行一周中同一天检查的工作流的开头DAG2添加。ShortCircuitOperator同样,如果需要一周中的某一天,则继续处理 DAG 的其余部分。


推荐阅读