首页 > 解决方案 > All_Success 触发规则未按预期工作

问题描述

我在我的工作流程中使用 trigger rule = "all_sucess" 来处理所有父任务都成功然后只有下一个任务被触发的场景。但同样的事情并没有发生。

工作流程:

任务 1 >> 任务 2a、任务 2b、任务 2c(并行运行)>> 任务 c

场景 1 - 任务 2a 之一进入 up_for_retry ,即使这样,任务 c 也被执行

场景 2 - 其中一项任务 2a 处于运行状态,但任务 c 仍被执行

注意- 在所有地方,我们都将 trigger_rule 设置为 all_success

在理想情况下,任务 C 不应该被触发,直到所有任务 2a、2b、2c 都成功完成。

query_template_dict = {
    'partner_list' = ['val1', 'val2']
    'google_project': 'project_name',
    'queries': {
        'layer3': {
            'template':             'temp.sql'
        }
    },
    'applicable_tasks': {
        'val1': {
            'table_layer3': ['activity']
        },
        'val2': {
            'table_layer3': ['activity'],
        }

    }
}


for partner in query_template_dict['partner_list']:
    # Loop over applicable report queries for a partner
    applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
    for task in applicable_tasks:
                                              
        query_params=[
        {
                "name":                 "col1",
                "parameterType":        { "type": "STRING" },
                "parameterValue":       { "value": col1}
        }
        ]
        
        run_bq_cmd = BigQueryOperator (
                        task_id                                 =partner + '-' + task
                        trigger_rule                            ='all_success',
                        allow_large_results                     =True,
                        dag=dag
                        )
        # Creating dependency on previous tasks
        run_dummy >> run_bq_cmd
        
        for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
        
            run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
                                        bash_command = bash_command,
                                        trigger_rule= 'all_success',
                                        dag = dag
                                       )
            run_bq_cmd >> run_sub_task                           
            bash_command    = <some bash command>       
            end_task = BashOperator( task_id =      'end_task',
                                     bash_command=  bash_command,
                                     trigger_rule=  'all_success',
                                     dag=           dag
                                    )
            # Creating dependency on previous tasks
            run_sub_task >> end_task

即使多个父任务(运行子任务)尚未完成,也会调用结束任务。

有人可以帮忙吗

标签: airflow

解决方案


如果您可以从图形的角度看到正确链接的子任务和最终任务,您是否检查过 Airlfow UI?

我认为您没有很好地设置依赖关系,并且您似乎在end_task实例化其父任务的同一循环中实例化。尝试执行以下操作:

for partner in query_template_dict['partner_list']:
    
    # your previous code ...
    # ...
    # ...
    
    bash_command    = <some bash command>       
    end_task = BashOperator( task_id =      'end_task',
                             bash_command=  bash_command,
                             trigger_rule=  'all_success',
                             dag=           dag
                            )
    
    for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
    
        run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
                                    bash_command = bash_command,
                                    trigger_rule= 'all_success',
                                    dag = dag
                                   )
                       
        # Create dependency on previous and next tasks
        run_bq_cmd >> run_sub_task >> end_task

推荐阅读