首页 > 解决方案 > 气流 - 如何将一个操作员的输出作为输入传递给另一个任务

问题描述

我有一个 http 端点列表,每个端点都单独执行一项任务。我们正在尝试编写一个应用程序,该应用程序将通过以特定顺序调用这些端点来进行编排。在这个解决方案中,我们还必须处理一个 http 端点的输出并为下一个 http 端点生成输入。此外,可以根据触发器同时调用相同的工作流。

到目前为止我所做的, 1. 定义了一个从 HttpOperator 派生的新运算符,并引入了将 http 端点的输出写入文件的功能。2.编写了一个python运算符,可以根据必要的逻辑传输输出。

由于我可以执行同一工作流的多个实例,因此我无法对输出文件名进行硬编码。有没有办法让我编写的 http 运算符写入一些唯一的文件名,并且下一个任务应该可以使用相同的文件名,以便它可以读取和处理输出。

标签: airflow

解决方案


Airflow 确实具有称为 XCom 的操作员交叉通信功能

XCom 可以“推送”(发送)或“拉取”(接收)。当一个任务推送一个 XCom 时,它通常可用于其他任务。任务可以通过调用 xcom_push() 方法随时推送 XCom。

任务调用 xcom_pull() 来检索 XCom,可选地应用基于键、源 task_ids 和源 dag_id 等标准的过滤器。

推送到 XCOM 使用

ti.xcom_push(key=<variable name>, value=<variable value>)

要拉出 XCOM 对象,请使用

myxcom_val = ti.xcom_pull(key=<variable name>, task_ids='<task to pull from>')

使用 bash operator ,您只需设置xcom_push = True并将 stdout 中的最后一行设置为 xcom 对象。

您可以在任务运行时查看 xcom 对象,只需从气流 UI 中打开 tast 执行并单击 xcom 选项卡即可。


推荐阅读