首页 > 解决方案 > 从一个 Airflow DAG 返回值到另一个

问题描述

我的 DAG(我们称之为 DAG_A)使用trigger_dagrun运算符启动另一个 DAG(DAG_B)。DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。

使用 XCOM 并不是硬性要求——基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。

找不到此类案例的任何示例,因此感谢您的帮助。

计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储(如 DB 或文件)中,而 DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂情况。

标签: pythonairflow

解决方案


可以dag_id通过传入to xcom_pull()(参见task_instance.xcom_pull()函数文档)从另一个 dag 中提取 XCOM 值。只要您使用与当前 DAG 相同的执行日期触发 subdag,这将起作用。通过模板化execution_date值可以轻​​松实现:

trigger = TriggerDagRunOperator(
    task_id="trigger_dag_b",
    trigger_dag_id="DAG_B",
    execution_date="{{ execution_date }}",
    ...
)

然后,如果您使用ExternalTaskSensor传感器等待特定任务完成或wait_for_completion=True在您的TriggerDagRunOperator()任务中使用,您可以稍后拉 XCOM task_instance.xcom_pull(dag_id="DAG_B", ...)(添加任务 ID 和/或您想要拉的 XCOM 密钥)。

如果您不反对编写 Python 运算符,也可以导入XCom模型并直接使用其XCom.get_one()方法

value = XCom.get_one(
    execution_date=ti.execution_date,
    key="target key",
    task_id="some.task.id",
    dag_id="DAG_B",
)

我使用了类似的技术,使用 multi-dagrun 触发器(处理可变数量的资源);这更棘手,因为在这种情况下您不能重复使用执行日期(每个 dagrun 必须有一个唯一的 (dag_id, execution_date) 元组)。

在这些情况下,我要么使用直接查询(使用触发器存储在 XCom 中的 dagrun id将 SQLAlchemyXCom模型与模型结合起来,而不是依赖于执行日期匹配),要么通过预先配置 subdags避免整个问题。后者是通过使用配置设置子 dag 来实现的,该配置告诉它在哪里输出父 dag 然后拾取的结果。文档似乎没有正确提及这一点,但是支持模板的参数也可以,因此您可以在那里生成字典作为子 dag 的输入,子 dag 中的任务然后通过.DagRunconfTriggerDagRun()params


推荐阅读