airflow - 气流 - 如何将一个操作员的输出作为输入传递给另一个任务
问题描述
我有一个 http 端点列表,每个端点都单独执行一项任务。我们正在尝试编写一个应用程序,该应用程序将通过以特定顺序调用这些端点来进行编排。在这个解决方案中,我们还必须处理一个 http 端点的输出并为下一个 http 端点生成输入。此外,可以根据触发器同时调用相同的工作流。
到目前为止我所做的, 1. 定义了一个从 HttpOperator 派生的新运算符,并引入了将 http 端点的输出写入文件的功能。2.编写了一个python运算符,可以根据必要的逻辑传输输出。
由于我可以执行同一工作流的多个实例,因此我无法对输出文件名进行硬编码。有没有办法让我编写的 http 运算符写入一些唯一的文件名,并且下一个任务应该可以使用相同的文件名,以便它可以读取和处理输出。
解决方案
Airflow 确实具有称为 XCom 的操作员交叉通信功能
XCom 可以“推送”(发送)或“拉取”(接收)。当一个任务推送一个 XCom 时,它通常可用于其他任务。任务可以通过调用 xcom_push() 方法随时推送 XCom。
任务调用 xcom_pull() 来检索 XCom,可选地应用基于键、源 task_ids 和源 dag_id 等标准的过滤器。
推送到 XCOM 使用
ti.xcom_push(key=<variable name>, value=<variable value>)
要拉出 XCOM 对象,请使用
myxcom_val = ti.xcom_pull(key=<variable name>, task_ids='<task to pull from>')
使用 bash operator ,您只需设置xcom_push = True
并将 stdout 中的最后一行设置为 xcom 对象。
您可以在任务运行时查看 xcom 对象,只需从气流 UI 中打开 tast 执行并单击 xcom 选项卡即可。
推荐阅读
- python - Python - 如果从另一个目录运行,找不到图像文件?
- node.js - 将 Webpack 与 Auth0 集成或如何将其他 Node.js 模块添加到 IBM Cloud Functions
- javascript - 每个属性的模态屏幕
- node.js - Gulp Fontello 使用 adm-zip 出错,最近的现象
- android - 这些问题中的一个或多个是违反政策的。您必须解决这些问题。阿德莫布?
- python - 无法在 tkinter 框架中检查多个 Checkbutton
- javascript - react-syntax-highlighter 中的高亮行
- snowflake-cloud-data-platform - Snowflake JDBC 驱动程序在每个 SAML 连接上将用户重定向到浏览器,只是为了显示一条消息。在 3.6.7 中修复并在 3.12.8 中再次损坏
- javascript - Angular 10 中的动画
- php - 如何在不发出任何外部网络请求的情况下,在 PHP 中将字符串从一种语言翻译成另一种语言?