airflow - Airflow - 只有在另一个 DAG 上的所有任务都成功时才运行 DAG
问题描述
我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个彼此独立的任务,我还有另一个 DAG,只有在所有 10 个任务都成功时才能运行. 因为按照我的方式,如果一个任务失败,DAG 仍然运行其他任务,并且 DAG 被标记为成功。(这就是我想要的)
有没有办法创建一个新任务(任务 11)来完成其他任务并检查它们的状态?我找不到返回任务状态的函数
我在想这样的事情(让我们假设有一个state()
函数)
array_of_task_ids= [task1, task2, task3,...]
for i in array_of_tasks_ids
if array_of_task_id[i].state() == Failed
#这意味着如果它发现一个状态为失败的任务,它将运行一个新的虚拟任务,指示其中一个任务失败
task_sensor_failed = DummyOperator(
task_id='task_sensor_failed',
dag=dag,
)
然后在仅当此任务“task_sensor_failed”未运行时才应运行的另一个 DAG 上,我将放置传感器
external_sensor= ExternalTaskSensor(
task_id='external_sensor',
external_dag_id='10_tasks_dag',
external_task_id='task_sensor_failed',
... )
这不是我会使用的实际代码。我知道这是不对的,我只是想做一些简单的事情,这样你就明白我在做什么。我不知道,也许这是一种愚蠢的做法,但就像我说我是新手,所以我不确定我在做什么。
无论如何,一般的想法是,如果另一个 DAG 的所有 10 个任务都成功了,我只能运行一个 DAG,任何人都可以帮助我完成那个吗?对不起,很长的帖子,并提前感谢您的帮助!有什么建议么?
解决方案
一旦你知道了
ExternalTaskSensor
也可以感知一个整体DAG
(而不是一个特定task
的DAG
)DAG
如果其中任何一个叶子任务失败,则 Airflow 将其标记为失败(换句话说,仅当所有叶子任务都成功时,Airflow 才将 DAG 标记为成功)
您无需在第一个 DAG 中添加任何虚拟任务即可做到这一点
就是这样
保持第一个 DAG 不变
ExternalTaskSensor
使您的第二个 DAG以感知第一个 DAG 的开头(只需指定external_dag_id
而不指定external_task_id
)
- 如果其中任何一项任务失败,这将继续标记您的第一个 DAG 失败
- 但如果第一个 DAG 的所有任务都成功(即第一个 DAG 成功),仍然会让第二个 DAG 运行
作为扩展,如果它符合您的要求,您可以使您的第一个 DAG反应性地触发第二个 DAG(仅当它的所有任务成功时)如下
在你的第一个 DAG 中,
- 放一个
TriggerDagRunOperator
withtrigger_rule=TriggerRule.ALL_SUCCESS
(默认)和 - 使其成为所有 10 个独立任务的下游
upstream_tasks_list >> trigger_task
- 让它触发第二个 DAG
trigger_dag_id='my_2nd_dag_id'
- 放一个
保持第二个 DAG 不变
推荐阅读
- javascript - Timesheet.js:背景和标签/边框颜色
- javascript - 显示来自 Firestore 的递增值
- python - 如果所有元素都存在,则在 dict 中附加所有值并删除重复项
- spring - Docker compose 等待 Spring Boot 应用程序启动
- laravel - 在 Laravel 7 中存储具有不同哈希类型的密码
- java - 如何从火力基地中删除?
- javascript - Node 已经缓存了所有东西——包括路由和页面
- php - 如何删除包含 PHP、Laravel 中值空格的键?
- python - ModuleNotFoundError:没有名为“ebooklib”的模块
- java - 即使字符串受到限制,字符串索引也超出范围