airflow - 有条件地一一执行多个分支
问题描述
笔记
- 请仔细阅读并理解问题
- 不能简单解决
BranchPythonOperator
/ShortCircuitOperator
我们的工作流程中有一个不寻常的类似多路复用器的用例
+-----------------------+
| |
+------------>+ branch-1.begin-task |
| | |
| +-----------------------+
|
|
| +-----------------------+
| | |
+------------>+ branch-2.begin-task |
| | |
+------------+ | +-----------------------+
| | |
| MUX-task +----+ +
| | | |
+------------+ |
| |
+- -- -- -- ->
| |
|
| |
| +
|
| +-----------------------+
| | |
+------------>+ branch-n.begin-task |
| |
+-----------------------+
该流程预计将按如下方式工作
MUX-task
侦听外部队列(单队列)上的事件- 队列上的每个事件都会触发其中一个分支的执行(branch-n.begin-task)
- 一个接一个,当事件到达时,MUX-task 必须触发相应分支的执行
- 一旦所有分支都被触发,MUX 任务完成
假设
- 确切
n
的事件到达队列,一个用于触发每个分支 n
是动态已知的:它的值定义在Variable
限制
- 事件到达的外部队列只有一个
- 我们不能有
n
队列(每个分支一个),因为分支会随着时间增长(n 是动态定义的)
我们无法在 Airflow 的一组操作员和传感器(或任何此类可在引擎盖外可用的东西Airflow
)中提出解决方案来构建它
Sensor
s 可用于监听外部队列上的事件;但我们必须监听多个事件,而不是一个BranchPythonOperator
可用于触发执行多个分支中的单个分支,但它会立即将剩余的分支标记为已跳过
主要瓶颈
由于上述第二个限制,即使是结合 a Sensor
and功能的自定义运算符BranchPythonOperator
也不起作用。
我们试图围绕 , 的一个奇特组合进行头脑风暴Sensors
,DummyOperator
并trigger_rules
实现这一目标,但到目前为止还没有成功。
这在气流中可行吗?
更新-1
这里有一些背景信息来了解工作流的上下文
- 我们有一个 ETL 管道将
MySQL
表(跨多个Aurora
数据库)同步到我们的数据湖 - 为了克服我们的同步管道对生产数据库的影响,我们决定这样做
- 为每个数据库创建一个快照(从上次备份还原
AuroraDB
集群) - 使用该快照运行
MySQL
同步管道 - 在同步结束时,终止快照(
AuroraDB
集群)
- 为每个数据库创建一个快照(从上次备份还原
- 快照恢复过程的快照生命周期事件发布到队列
Aurora
SQS
- 所有数据库的单个队列
- 这个设置是由我们的 DevOps 团队完成的(不同的 AWS 账户,我们无权访问底层
Lambda
sSQS
//infra)
解决方案
XCOM
来救援!
我们决定将任务建模如下(两个任务都是自定义 operator
的)
- 这
MUX-task
更像是一个迭代sensor
- :它一直在侦听队列中的事件,并对每个到达队列的事件采取一些措施 - 所有
branch-x.begin-task
s 都是简单的传感器:它们监听一个XCOM
(谁的名字是预先定义的特定格式)的发布
工作流运行如下
- 侦听队列上的
MUX-task
事件(侦听部分包含在一个循环中,for
迭代次数与分支数一样多) - 当事件到达时,将其
MUX-task
接起;它确定应该触发哪个“分支”并XCOM
为相应的分支发布一个 - 相应的分支在它的下一次戳
sensor
中拾取XCOM
它并且分支开始执行。实际上,分支sensor
只是作为一个网关,通过外部事件 (XCOM
) 打开并允许执行分支
由于传感器太多(每个分支一个),我们很可能会使用mode='reschedule'
它来克服死锁
- 由于所描述的方法严重依赖于polling,我们认为它不是超级有效的。
- 基于反应性触发的方法会更可取,但我们无法解决
更新-1
- 如果我们可以将每个分支建模为一个单独
DAG
的分支,而不是为每个分支发布XCOM
s,就像触发分支DAG
一样TriggerDagRunOperator
,看起来“反应式”方法是可以实现的 - 但是由于我们的单体是通过复杂的逻辑以
DAG
编程方式生成的,因此这种更改将非常困难(大量代码重写)。所以我们决定继续使用基于投票的方法,并在已经需要几个小时才能完成的管道中忍受几分钟的额外延迟
更新-2
[参考问题的UPDATE-1部分]
由于我们的实际实现需要我们等待创建数据库,我们决定简化工作流程如下
- 数据库端点是通过修复的(每次恢复快照时
DNS
它们都不会改变)Aurora
- 我们取消了
MUX-task
(以及Aurora 恢复生命周期事件SQS
的队列) - 每个分支的开始任务
branch-x.begin-task
都被建模为一个简单sensor
的尝试触发一个虚拟SQL 查询 (SELECT 1
) 以检查数据库端点是否已变为活动状态
推荐阅读
- r - R中列上的变异函数循环
- python - 在window10上使用conda安装yolov3时出现错误解决环境:失败
- excel - 禁用 DDE 警告
- java - aws ecs 使用容器覆盖运行 ecs 任务
- python - Python(Jupyter Notebook):显示条形图,列作为图例,索引作为 x-ticks
- machine-learning - keras ANN 中的输入和输出形状
- javascript - 如何将 React Hook useState 与自定义实体对象一起使用?
- fortran - 如何使用 Fortran 读取 .txt 文件中的特殊行?
- ios - 无法访问目标 viewController
- java - 找不到更新 cask Generic Artifact Source