airflow - Airflow 成功后多次重新运行单个任务
问题描述
按顺序重新运行任务 (A) 3 次的最佳方法是什么?:
即任务 A -> 任务 A -> 任务 A -> 任务 B
我问是因为我将运行另一个单独的数据验证任务 (B),它将比较来自这 3 个单独运行的数据。
所以这就是我到目前为止所做的:
dag = DAG("hello_world_0", description="Starting tutorial", schedule_interval='* * * * *',
start_date=datetime(2019, 1, 1),
catchup=False)
data_pull_1 = BashOperator(task_id='attempt_1', bash_command='echo "Hello World - 1!"',dag=dag)
data_pull_2 = BashOperator(task_id='attempt_2', bash_command='echo "Hello World - 2!"',dag=dag)
data_pull_3 = BashOperator(task_id='attempt_3', bash_command='echo "Hello World - 3!"',dag=dag)
data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"',dag=dag)
data_pull_1 >> data_pull_2 >> data_pull_3 >> data_validation
这可能有效,但有更优雅的方式吗?
解决方案
您可以尝试以下实现,我们使用 for 循环创建 3 个操作
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
"hello_world_0",
description="Starting tutorial",
schedule_interval=None,
start_date=datetime(2019, 1, 1),
catchup=False
)
chain_operators = []
max_attempt = 3
for attempt in range(max_attempt):
data_pull = BashOperator(
task_id='attempt_{}'.format(attempt),
bash_command='echo "Hello World - {}!"'.format(attempt),
dag=dag
)
chain_operators.append(data_pull)
data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"', dag=dag)
chain_operators.append(data_validation)
# Add downstream
for i,val in enumerate(chain_operators[:-1]):
val.set_downstream(chain_operators[i+1])
我将 schedule_interval 更改为 None,因为 with '* * * * *'
job 将被连续触发
推荐阅读
- apache-spark - Hadoop 文件格式
- python - 当 nan 序列连续 > 20 时删除数组的一部分
- html - 使用 rvest 获取 Whole Foods 商店列表
- java - 如何正确地将我的语法从字符串转换为数组
- java - 通过apache spark将行收集为列表
- php - barryvdh/laravel-cors 特定路线问题
- algorithm - 属性渐近分析。
- javascript - 如何减慢 Google 脚本的速度
- java - 我正在尝试设置 Timer 并尝试了这个逻辑,但它没有运行?
- excel - 从开发人员的 IDE (F5) 运行 Excel VBA 函数和从电子表格调用函数 (=) 有什么区别