airflow - 通用 Airflow 数据暂存运算符
问题描述
我正在尝试了解如何使用 Airflow 管理大数据。文档清楚地表明我们需要使用外部存储,而不是 XCom,但我找不到任何将数据转移到工作节点和从工作节点转移数据的干净示例。
我的期望是应该有一个操作员可以在运行中运行分期,运行主要操作,然后再次分期。
有这样的运算符或模式吗?我发现的关闭是一个S3 文件转换,但它运行一个可执行文件来进行转换,而不是一个通用的 Operator,例如我们想要使用的 DockerOperator。
我见过的其他“解决方案”依赖于在单个主机上运行的所有内容,并使用已知路径,这不是生产就绪的解决方案。
是否有这样的运营商可以支持数据分期,或者是否有使用 Airflow 处理大数据的具体示例,而不依赖于每个运营商都配备云应对能力?
解决方案
是和不是。传统上,Airflow 主要是协调器——所以它通常不会“做”这些东西,它通常会告诉其他人该做什么。您很少需要将实际数据带给 Airflow 工作人员,工作人员主要是告诉其他人数据来自哪里,如何处理以及将其发送到哪里。
有例外(一些传输运营商实际上从一个服务下载数据并将其上传到另一个) - 所以数据通过 Airflow 节点,但这是一个例外而不是规则(更有效和更好的模式是调用外部服务进行传输并让传感器等待它完成)。
这更像是 Airflow 的“历史”和“当前”的运行方式,但是随着 Airflow 2 及以后的版本,我们正在扩展这一点,并且越来越有可能执行类似于您所描述的模式,这就是 XCom 发挥作用的地方那里有很大的作用。
您可以 - 最近 - 开发自定义 XCom 后端,允许的不仅仅是元数据共享 - 它们也有利于共享数据。您可以在此处查看文档https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends但您也有来自 Astronomer 的这篇不错的文章https://www.astronomer .io/guides/custom-xcom-backends以及 2021 年 Airflow 峰会(从上周开始)的精彩演讲,其中介绍了:https ://airflowsummit.org/sessions/2021/customizing-xcom-to-enhance-data-sharing -任务之间/。我强烈推荐观看演讲!
查看您的模式 - XCom Pull 正在暂存,Operator 的 execute() 是操作,而 XCom Push 正在暂存。
这种模式将得到加强,我认为即将发布的 Airflow 版本和即将到来的一些很酷的集成。未来可能会有更酷的数据共享选项(但我认为它们都将基于 - 可能稍微增强 - XCom 实现)。
推荐阅读
- python-3.x - 如何抓取无法检查的数据和内部的数据
- javascript - 如何使用范围接口在双击时选择一个单词?
- java - Java Generic:如何扩展另一个泛型类的泛型类
- angular - 从 mergeMap 返回的 observable 无法捕获错误
- javascript - 用css将文本消失
- python - 尝试 Skimage.Filter 中的所有阈值
- java - 如何在不同项目中使用 Gradle 中的父级?
- telnet - How to connect with sshd?
- angular - 为每个文件夹自定义 TypeScript 编译器选项
- rest - 带有不记名令牌的 SharePoint 搜索 REST API 返回错误的项目数