首页 > 解决方案 > 有条件地一一执行多个分支

问题描述

笔记


我们的工作流程中有一个不寻常的类似多路复用器的用例

                                +-----------------------+
                                |                       |
                  +------------>+  branch-1.begin-task  |
                  |             |                       |
                  |             +-----------------------+
                  |
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-2.begin-task  |
                  |             |                       |
+------------+    |             +-----------------------+
|            |    |
|  MUX-task  +----+                         +
|            |    |                         |
+------------+    |
                  |                         |
                  +- -- -- -- ->
                  |                         |
                  |
                  |                         |
                  |                         +
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-n.begin-task  |
                                |                       |
                                +-----------------------+

该流程预计将按如下方式工作

假设

限制


我们无法在 Airflow 的一组操作员和传感器(或任何此类可在引擎盖外可用的东西Airflow)中提出解决方案来构建它

  1. Sensors 可用于监听外部队列上的事件;但我们必须监听多个事件,而不是一个
  2. BranchPythonOperator可用于触发执行多个分支中的单个分支,但它会立即将剩余的分支标记为已跳过

主要瓶颈

由于上述第二个限制,即使是结合 a Sensorand功能的自定义运算符BranchPythonOperator也不起作用。

我们试图围绕 , 的一个奇特组合进行头脑风暴SensorsDummyOperatortrigger_rules实现这一目标,但到目前为止还没有成功。

这在气流中可行吗?


更新-1

这里有一些背景信息来了解工作流的上下文

标签: airflow

解决方案


XCOM来救援!


我们决定将任务建模如下(两个任务都是自定义 operator的)

  • MUX-task更像是一个迭代sensor- :它一直在侦听队列中的事件,并对每个到达队列的事件采取一些措施
  • 所有branch-x.begin-tasks 都是简单的传感器:它们监听一个XCOM(谁的名字是预先定义的特定格式)的发布

工作流运行如下

  • 侦听队列上的MUX-task事件(侦听部分包含在一个循环中,for迭代次数与分支数一样多)
  • 当事件到达时,将其MUX-task接起;它确定应该触发哪个“分支”并XCOM为相应的分支发布一个
  • 相应的分支在它的下一次戳sensor中拾取XCOM它并且分支开始执行。实际上,分支sensor只是作为一个网关,通过外部事件 ( XCOM) 打开并允许执行分支

由于传感器太多(每个分支一个),我们很可能会使用mode='reschedule'它来克服死锁


  • 由于所描述的方法严重依赖于polling,我们认为它不是超级有效的。
  • 基于反应性触发的方法会更可取,但我们无法解决

更新-1

  • 如果我们可以将每个分支建模为一个单独DAG的分支,而不是为每个分支发布XCOMs,就像触发分支DAG一样TriggerDagRunOperator,看起来“反应式”方法是可以实现的
  • 但是由于我们的单体是通过复杂的逻辑以DAG编程方式生成的,因此这种更改将非常困难(大量代码重写)。所以我们决定继续使用基于投票的方法,并在已经需要几个小时才能完成的管道中忍受几分钟的额外延迟

更新-2

[参考问题的UPDATE-1部分]

由于我们的实际实现需要我们等待创建数据库,我们决定简化工作流程如下

  • 数据库端点是通过修复的(每次恢复快照时DNS它们都不会改变)Aurora
  • 我们取消了MUX-task(以及Aurora 恢复生命周期事件SQS的队列)
  • 每个分支的开始任务branch-x.begin-task都被建模为一个简单sensor的尝试触发一个虚拟SQL 查询 ( SELECT 1) 以检查数据库端点是否已变为活动状态

推荐阅读