airflow - 气流将任务实例状态设置为以编程方式跳过
问题描述
我有我循环创建任务的列表。该列表的大小是静态的。
for counter, account_id in enumerate(ACCOUNT_LIST):
task_id = f"bash_task_{counter}"
if account_id:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
else:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
trigger_task
我手动尝试过,但无法跳过任务:
start = DummyOperator(task_id='start')
task1 = DummyOperator(task_id='task_1')
task2 = DummyOperator(task_id='task_2')
task3 = DummyOperator(task_id='task_3')
task4 = DummyOperator(task_id='task_4')
start >> task1
start >> task2
try:
start >> task3
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task3')
try:
start >> task4
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task4')
解决方案
是的,你需要raise AirflowSkipException
from airflow.exceptions import AirflowSkipException
raise AirflowSkipException
更多信息参见源代码
推荐阅读
- html - 向不同容器添加边距时的导航栏移动
- html - Simple_form 在 Ruby on Rails 6 中未显示 file_field 上传错误消息
- java - 构造函数中如何添加观察者?
- python - 覆盖由后续导入导入的函数?
- c# - 我可以在 c# 中为一个参数接受多种类型吗?
- shell - 在 jenkins 声明性管道 cloudformation 中使用带有 shell 命令的 If 语句
- python - 在函数中按数字顺序排列打印的语句
- javascript - 如何使用 Flask 让 HTML 表单提交(和 Python 函数返回)不执行任何操作
- javascript - Vaadin 路由器:将参数路由到地址栏不起作用
- r - 将列表列表转换为 R 中的数据框,显示错误