首页 > 解决方案 > Airflow - 只有在另一个 DAG 上的所有任务都成功时才运行 DAG

问题描述

我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个彼此独立的任务,我还有另一个 DAG,只有在所有 10 个任务都成功时才能运行. 因为按照我的方式,如果一个任务失败,DAG 仍然运行其他任务,并且 DAG 被标记为成功。(这就是我想要的)

有没有办法创建一个新任务(任务 11)来完成其他任务并检查它们的状态?我找不到返回任务状态的函数

我在想这样的事情(让我们假设有一个state()函数)

array_of_task_ids= [task1, task2, task3,...]
for i in array_of_tasks_ids
if array_of_task_id[i].state() == Failed 

#这意味着如果它发现一个状态为失败的任务,它将运行一个新的虚拟任务,指示其中一个任务失败

 task_sensor_failed = DummyOperator(
 task_id='task_sensor_failed',
 dag=dag,
 )

然后在仅当此任务“task_sensor_failed”未运行时才应运行的另一个 DAG 上,我将放置传感器

external_sensor= ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='10_tasks_dag',
external_task_id='task_sensor_failed',

... )

这不是我会使用的实际代码。我知道这是不对的,我只是想做一些简单的事情,这样你就明白我在做什么。我不知道,也许这是一种愚蠢的做法,但就像我说我是新手,所以我不确定我在做什么。

无论如何,一般的想法是,如果另一个 DAG 的所有 10 个任务都成功了,我只能运行一个 DAG,任何人都可以帮助我完成那个吗?对不起,很长的帖子,并提前感谢您的帮助!有什么建议么?

标签: airflow

解决方案


一旦你知道了

  • ExternalTaskSensor也可以感知一个整体DAG(而不是一个特定taskDAG
  • DAG如果其中任何一个叶子任务失败,则 Airflow 将其标记为失败(换句话说,仅当所有叶子任务都成功时,Airflow 才将 DAG 标记为成功)

您无需在第一个 DAG 中添加任何虚拟任务即可做到这一点


就是这样

  1. 保持第一个 DAG 不变

  2. ExternalTaskSensor使您的第二个 DAG以感知第一个 DAG 的开头(只需指定external_dag_id而不指定external_task_id

  • 如果其中任何一项任务失败,这将继续标记您的第一个 DAG 失败
  • 但如果第一个 DAG 的所有任务都成功(即第一个 DAG 成功),仍然会让第二个 DAG 运行

作为扩展,如果它符合您的要求,您可以使您的第一个 DAG反应性地触发第二个 DAG(仅当它的所有任务成功时)如下

  1. 在你的第一个 DAG 中,

  2. 保持第二个 DAG 不变


推荐阅读