python - 从一个 Airflow DAG 返回值到另一个
问题描述
我的 DAG(我们称之为 DAG_A)使用trigger_dagrun
运算符启动另一个 DAG(DAG_B)。DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。
使用 XCOM 并不是硬性要求——基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。
找不到此类案例的任何示例,因此感谢您的帮助。
计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储(如 DB 或文件)中,而 DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂情况。
解决方案
您可以dag_id
通过传入to xcom_pull()
(参见task_instance.xcom_pull()
函数文档)从另一个 dag 中提取 XCOM 值。只要您使用与当前 DAG 相同的执行日期触发 subdag,这将起作用。通过模板化execution_date
值可以轻松实现:
trigger = TriggerDagRunOperator(
task_id="trigger_dag_b",
trigger_dag_id="DAG_B",
execution_date="{{ execution_date }}",
...
)
然后,如果您使用ExternalTaskSensor
传感器等待特定任务完成或wait_for_completion=True
在您的TriggerDagRunOperator()
任务中使用,您可以稍后拉 XCOM task_instance.xcom_pull(dag_id="DAG_B", ...)
(添加任务 ID 和/或您想要拉的 XCOM 密钥)。
如果您不反对编写 Python 运算符,也可以导入XCom
模型并直接使用其XCom.get_one()
方法:
value = XCom.get_one(
execution_date=ti.execution_date,
key="target key",
task_id="some.task.id",
dag_id="DAG_B",
)
我使用了类似的技术,使用 multi-dagrun 触发器(处理可变数量的资源);这更棘手,因为在这种情况下您不能重复使用执行日期(每个 dagrun 必须有一个唯一的 (dag_id, execution_date) 元组)。
在这些情况下,我要么使用直接查询(使用触发器存储在 XCom 中的 dagrun id将 SQLAlchemyXCom
模型与模型结合起来,而不是依赖于执行日期匹配),要么通过预先配置 subdags来避免整个问题。后者是通过使用配置设置子 dag 来实现的,该配置告诉它在哪里输出父 dag 然后拾取的结果。文档似乎没有正确提及这一点,但是支持模板的参数也可以,因此您可以在那里生成字典作为子 dag 的输入,子 dag 中的任务然后通过.DagRun
conf
TriggerDagRun()
params
推荐阅读
- python - 是否有将列表转换为函数的内置方法?
- amazon-web-services - Kinesis 是否保证交货?
- python - Django 的 GEOS API 中的几何交集不再起作用
- cloud - Terraform:云存储失败:存储:存储桶不存在
- reactjs - 禁用密码输入中的黄色框反应本机
- excel - 避免在 Excel 工作表中打印空视图/区域
- python - 从 Pandas 的列中删除错误值
- java - Java - 返回多种返回类型
- github - VSCode 拉取请求 - 组织
- javascript - Content-Security-Policy 阻止 Vue.js