airflow - All_Success 触发规则未按预期工作
问题描述
我在我的工作流程中使用 trigger rule = "all_sucess" 来处理所有父任务都成功然后只有下一个任务被触发的场景。但同样的事情并没有发生。
工作流程:
任务 1 >> 任务 2a、任务 2b、任务 2c(并行运行)>> 任务 c
场景 1 - 任务 2a 之一进入 up_for_retry ,即使这样,任务 c 也被执行
场景 2 - 其中一项任务 2a 处于运行状态,但任务 c 仍被执行
注意- 在所有地方,我们都将 trigger_rule 设置为 all_success
在理想情况下,任务 C 不应该被触发,直到所有任务 2a、2b、2c 都成功完成。
query_template_dict = {
'partner_list' = ['val1', 'val2']
'google_project': 'project_name',
'queries': {
'layer3': {
'template': 'temp.sql'
}
},
'applicable_tasks': {
'val1': {
'table_layer3': ['activity']
},
'val2': {
'table_layer3': ['activity'],
}
}
}
for partner in query_template_dict['partner_list']:
# Loop over applicable report queries for a partner
applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
for task in applicable_tasks:
query_params=[
{
"name": "col1",
"parameterType": { "type": "STRING" },
"parameterValue": { "value": col1}
}
]
run_bq_cmd = BigQueryOperator (
task_id =partner + '-' + task
trigger_rule ='all_success',
allow_large_results =True,
dag=dag
)
# Creating dependency on previous tasks
run_dummy >> run_bq_cmd
for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
bash_command = bash_command,
trigger_rule= 'all_success',
dag = dag
)
run_bq_cmd >> run_sub_task
bash_command = <some bash command>
end_task = BashOperator( task_id = 'end_task',
bash_command= bash_command,
trigger_rule= 'all_success',
dag= dag
)
# Creating dependency on previous tasks
run_sub_task >> end_task
即使多个父任务(运行子任务)尚未完成,也会调用结束任务。
有人可以帮忙吗
解决方案
如果您可以从图形的角度看到正确链接的子任务和最终任务,您是否检查过 Airlfow UI?
我认为您没有很好地设置依赖关系,并且您似乎在end_task
实例化其父任务的同一循环中实例化。尝试执行以下操作:
for partner in query_template_dict['partner_list']:
# your previous code ...
# ...
# ...
bash_command = <some bash command>
end_task = BashOperator( task_id = 'end_task',
bash_command= bash_command,
trigger_rule= 'all_success',
dag= dag
)
for sub_tasks in query_template_dict['applicable_tasks'][partner][task]:
run_sub_task = BashOperator(task_id = partner+ '_' + task + '_' + sub_tasks,
bash_command = bash_command,
trigger_rule= 'all_success',
dag = dag
)
# Create dependency on previous and next tasks
run_bq_cmd >> run_sub_task >> end_task
推荐阅读
- javascript - 未捕获的referenceError:未定义的变量,即使我将其设置为全局
- angular - 使用带角度的蚂蚁设计组件
- ios - iOS 13 暗模式:traitCollectionDidChange 仅在第一次调用
- angular - Angular 6:没有出现
- java - 在 lombok 之后未调用覆盖 Setter 方法
- python - 如果我在非常小的训练数据集上不能得到 0 错误,这意味着什么?
- sql - 如何对 BigQuery SQL 中重复的不同列进行求和
- google-ads-api - 如何在 codigniter 中实现 google adsense
- node.js - Mongoose:计算 $lookup 聚合步骤返回的数组中不同字段值的数量
- hadoop - 如何在动态 hdfs 目录上创建分区 hive 表