首页 > 解决方案 > 如何多次运行完整的 DAG 而不是重复运行每个任务

问题描述

我有一个 DAG,其中有多个任务排队成简单而直接的依赖关系。

import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from airflow.settings import log


def task1_cb(ds, **kwargs):
    log.info('Task1 Complete for date: %s' % kwargs.get('end_date'))


def task2_cb(ds, **kwargs):
    log.info('Task2 Complete for date: %s' % kwargs.get('end_date'))


def task3_cb(ds, **kwargs):
    log.info('Task3 Complete for date: %s' % kwargs.get('end_date'))


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'concurrency': 1,
    'retries': 0
}

dag = DAG(
    'sample_serial_dag',
    start_date=dt.datetime(2018,9,1),
    end_date=dt.datetime(2018,9,5),
    default_args=default_args,
    schedule_interval='@daily',
    catchup=True
)

task1 = PythonOperator(task_id='t1', provide_context=True, python_callable=task1_cb, dag=dag)
task2 = PythonOperator(task_id='t2', provide_context=True, python_callable=task2_cb, dag=dag)
task3 = PythonOperator(task_id='t3', provide_context=True, python_callable=task3_cb, dag=dag)

task1 >> task2 >> task3

我希望它能够赶上过去的日期(正在运行@daily)。我现在得到的是任务 1 运行 5 次以赶上 5 个截止日期,完成后转到任务 2,然后运行 ​​5 次,依此类推。执行流程如下:

Task1 Complete for date: 2018-09-01
Task1 Complete for date: 2018-09-02
Task1 Complete for date: 2018-09-03
Task1 Complete for date: 2018-09-04
Task1 Complete for date: 2018-09-05

Task2 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-05

Task3 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-05

图表中的任务 1 已完成所有过去的日期,然后继续执行任务 2

我想要的是以下内容:

执行流程如下:

Task1 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-01

Task1 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-02

Task1 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-03

Task1 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-04

Task1 Complete for date: 2018-09-05
Task2 Complete for date: 2018-09-05
Task3 Complete for date: 2018-09-05

标签: pythonairflow

解决方案


这种奇怪行为的原因是default_args设置depends_on_pastFalse。我从一些教程或示例代码中复制粘贴了它,但实际上并没有注意到它的作用。根据文档

depends_on_past (bool) – 设置为 true 时,任务实例将按顺序运行,同时依赖前一个任务的计划来成功。start_date 的任务实例被允许运行。

将其设置为 True 解决了我的问题。


推荐阅读