python - 如何使用分支运算符在 Airflow DAG 中分支多个路径?
问题描述
这就是我想要的,但我不知道如何在气流中实现这一点,因为这两个任务都在执行。
总结一下:
- T1 执行
- T2 执行
- 根据 T2 的输出,我想去或
option_1 -> complete
option_2 -> Do_x, Do_y -> complete
我应该如何构建这个?我有这个作为我当前的代码:
(t1 >> t2 >> option_1 >> complete)
(t1 >> t2 >> option_2 >> do_x >> do_y >> complete)
在这种情况下,t2 是一个分支运算符。
我也尝试了 for 的语法,... [option_1, option_2] ...
但我需要一个完全独立的路径来执行,而不仅仅是一个要切换的任务。
解决方案
您在代码中的依赖项对于分支是正确的。确保根据您需要的任何逻辑在分支开始时BranchPythonOperator
返回任务的。task_id
更多信息在BranchPythonOperator
这里。最后一个重要注意事项与“完成”任务有关。由于分支会聚在“完成”任务上,因此请确保将trigger_rule
其设置为“none_failed”(您也可以使用TriggerRule
类常量),这样任务就不会被跳过。
快速代码测试供您参考:
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
DEFAULT_ARGS = dict(
start_date=datetime(2021, 5, 5),
owner="airflow",
retries=0,
)
DAG_ARGS = dict(
dag_id="multi_branch",
schedule_interval=None,
default_args=DEFAULT_ARGS,
catchup=False,
)
def random_branch():
from random import randint
return "option_1" if randint(1, 2) == 1 else "option_2"
with DAG(**DAG_ARGS) as dag:
t1 = DummyOperator(task_id="t1")
t2 = BranchPythonOperator(task_id="t2", python_callable=random_branch)
option_1 = DummyOperator(task_id="option_1")
option_2 = DummyOperator(task_id="option_2")
do_x = DummyOperator(task_id="do_x")
do_y = DummyOperator(task_id="do_y")
complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)
t1 >> t2 >> option_1 >> complete
t1 >> t2 >> option_2 >> do_x >> do_y >> complete
推荐阅读
- java - 如何根据Arraylist整数的值循环
- ios - 如何从 Swift 中的远程目录中获取所有图像?
- reactjs - React Jest Enzyme - 无法通过简单的按钮点击测试
- powershell - 具有响应且没有窗口的Powershell脚本?
- javascript - 对字符数组使用 array.reduce()
- android - 上传到存储时Android中的StackOverFlowError
- google-chrome - 如何构建切换开/关 chrome 扩展
- javascript - 如何在 JavaScript 中对 base64 数据中的土耳其字符进行编码?
- angular - Angular Material io - 具有复选框树视图的项目 ID
- opengl - 渲染两个单独的对象,一个对象不出现在屏幕上