首页 > 解决方案 > BranchOperator 正在跳过气流

问题描述

我有这个流程: -

execution_date_hour = "{{ execution_date.strftime('%H') }}"

default_args = {
    'owner': 'hourly-airflow',
    'depends_on_past': False,
    'catch_up': False,
    'start_date': days_ago(1),
    'email': failure_email_list,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('hourly_pipeline_dag',
          default_args=default_args,
          tags=['hourly'],
          schedule_interval='@hourly',
          catchup=False)

taskA = PostgresOperator(dag=dag,
                         task_id='taskA', 
                         postgres_conn_id='database_connection',
                         sql='sql/hourly_entry.sql')

taskb = DummyOperator(
    dag=dag,
    task_id="taskb"
)

taske = DummyOperator(
    dag=dag,
    task_id="taske"
)

taskc = DummyOperator(
    dag=dag,
    task_id="taskc"
)

taskd = DummyOperator(
    dag=dag,
    task_id="taskd"
)

branch_op = BranchPythonOperator(
    task_id='branch_op',
    python_callable=lambda
        **kwargs: 'feed_sensor_a' if execution_date_hour == '5' else 'feed_sensor_b',
    dag=dag)

feed_sensor_a = SqlSensor(dag=dag,
                          task_id='feed_sensor_a',
                          conn_id='database_connection',
                          sql='sql/sensor_hourly.sql',
                          poke_interval=30,
                          trigger_rule=TriggerRule.ONE_SUCCESS,
                          timeout=3600)

feed_sensor_b = SqlSensor(dag=dag,
                          task_id='feed_sensor_b',
                          conn_id='database_connection',
                          sql='sql/sensor.sql',
                          poke_interval=30,
                          trigger_rule=TriggerRule.ONE_SUCCESS,
                          timeout=3600)

taskA >> [taskb,taskc]
taskb >> taskd
taskc >> taske
[taskd,taske] >> branch_op
branch_op >> [feed_sensor_a,feed_sensor_b] 

管道一直运行到 taskd 和 taske,branch_op 被跳过。请帮忙,我被困在这个问题上太久了。直到 taske 和 taskd 运行良好,branch_op 以红色突出显示,即跳过,不知道这里发生了什么。(所有这些任务都是虚拟任务,实际上它们是 HttpOperator 和 Postgres op)。提前感谢,如果需要任何其他信息,请告诉我。

标签: pythonairflow

解决方案


运行您的代码我没有看到branch_op任务失败或被跳过。但是,我认为您的BranchPythonOperator任务不会像您希望的那样工作。没有输入传递到 lambda 函数,python_callable也不是运算符的模板化字段(即,逻辑正在评估文字字符串"{{ execution_date.strftime('%H') }}",因此流程将始终跟随feed_sensor_b。试试这个:

branch_op = BranchPythonOperator(
    task_id="branch_op",
    python_callable=lambda execution_date_hour: "feed_sensor_a" if execution_date_hour == "5" else "feed_sensor_b",
    op_args=[execution_date_hour],
    dag=dag,
)

推荐阅读