首页 > 解决方案 > Airflow BranchPythonOperator - 分支后继续

问题描述

我有以下运算符,如下所示。我能够直观地看到图形表示看起来是正确的。但是,我无法使功能正常工作,因为路径不会继续经过任何一个分支。无论日期如何,两条路径都不会继续到 task_05。

DAG 有两 (2) 条路径:

(1)如果是月初,task_01->test_step->task_02->task_05->task_06

(2)如果不是月初,task_01->test_step->task_03->task_04->task_05->task_06

问题:该功能无法让 DAG 一直完成到 task_06。

假设问题在于我使用按位运算符的方式。

代码

from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import DAG


default_args = {
    'owner': 'astro',
    'depends_on_past': False,
    'start_date': datetime(2020, 5, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='astro_test',
    default_args=default_args,
    schedule_interval="0 10 * * *",
    catchup=True,
    max_active_runs=1
)

def dom_branch(ds_nodash, **kwargs):

    if ds_nodash[-2:] == '01':
        return "task_02"
    else:
        return "task_03"

with dag:
    one = DummyOperator(task_id='task_01')
    two = DummyOperator(task_id='task_02')
    three = DummyOperator(task_id='task_03')
    four = DummyOperator(task_id='task_04')
    five = DummyOperator(task_id='task_05')
    six = DummyOperator(task_id='task_06')

    dom_operator = BranchPythonOperator(
        task_id=f"test_step",
        provide_context=True,
        op_kwargs={'ds_nodash': '{{ ds_nodash }}'},
        python_callable=dom_branch
    )

    one >> dom_operator >> [two, three] 
    two >> five
    three >> four >> five   
    five >> six

每月 1 日 DAG 图表视图的屏幕截图(本月 1 日)

不是每月 1 日 DAG 图表视图的屏幕截图(不是 1 日)

标签: pythonairflow

解决方案


这是因为它task_5需要两个上游都完成才能开始。相反,您可以添加触发规则: https ://airflow.apache.org/docs/stable/concepts.html#trigger-rules

five = DummyOperator(
    task_id='task_05',
    trigger_rule=TriggerRule.ONE_SUCCESS,
)

推荐阅读