python - 如何多次运行完整的 DAG 而不是重复运行每个任务
问题描述
我有一个 DAG,其中有多个任务排队成简单而直接的依赖关系。
import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.settings import log
def task1_cb(ds, **kwargs):
log.info('Task1 Complete for date: %s' % kwargs.get('end_date'))
def task2_cb(ds, **kwargs):
log.info('Task2 Complete for date: %s' % kwargs.get('end_date'))
def task3_cb(ds, **kwargs):
log.info('Task3 Complete for date: %s' % kwargs.get('end_date'))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'concurrency': 1,
'retries': 0
}
dag = DAG(
'sample_serial_dag',
start_date=dt.datetime(2018,9,1),
end_date=dt.datetime(2018,9,5),
default_args=default_args,
schedule_interval='@daily',
catchup=True
)
task1 = PythonOperator(task_id='t1', provide_context=True, python_callable=task1_cb, dag=dag)
task2 = PythonOperator(task_id='t2', provide_context=True, python_callable=task2_cb, dag=dag)
task3 = PythonOperator(task_id='t3', provide_context=True, python_callable=task3_cb, dag=dag)
task1 >> task2 >> task3
我希望它能够赶上过去的日期(正在运行@daily
)。我现在得到的是任务 1 运行 5 次以赶上 5 个截止日期,完成后转到任务 2,然后运行 5 次,依此类推。执行流程如下:
Task1 Complete for date: 2018-09-01
Task1 Complete for date: 2018-09-02
Task1 Complete for date: 2018-09-03
Task1 Complete for date: 2018-09-04
Task1 Complete for date: 2018-09-05
Task2 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-05
Task3 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-05
我想要的是以下内容:
执行流程如下:
Task1 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-01
Task1 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-02
Task1 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-03
Task1 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-04
Task1 Complete for date: 2018-09-05
Task2 Complete for date: 2018-09-05
Task3 Complete for date: 2018-09-05
解决方案
这种奇怪行为的原因是default_args
设置depends_on_past
为False。我从一些教程或示例代码中复制粘贴了它,但实际上并没有注意到它的作用。根据文档:
depends_on_past (bool) – 设置为 true 时,任务实例将按顺序运行,同时依赖前一个任务的计划来成功。start_date 的任务实例被允许运行。
将其设置为 True 解决了我的问题。
推荐阅读
- image - Keras - 图像的均方误差 (MSE) 计算定义?
- android - 未处理的异常:错误状态:平台不允许使用不安全的 HTTP - usesClearTextTraffic 不起作用
- c# - 如何在 C# 中验证 webhook 的 RS256 签名属性?
- javascript - 多人 JavaScript 游戏如何与服务器通信?
- jquery - 如何在 ASP.NET Core 中的表单的 data-ajax-failure 函数中捕获 Ajax 错误?
- python-3.x - 使用 ffmpeg 下载 m3u8 段而不将 .ts 文件写入磁盘
- java - 无法从 EditText 获取值
- android - 半透明状态栏不半透明
- modal-dialog - 带有延迟的 Javascript 弹出窗口
- asp.net-core - 从 ModelExpression 中提取集合索引