首页 > 解决方案 > 气流DAG调度月的最后一天-n天

问题描述

我想安排我的 dag 在一个月的最后一天前 3 天运行,所以 2 月我的 dag 应该在 25 日运行,而 3 月的 dag 应该在 28 日运行。关于如何安排这个的任何想法?

谢谢

标签: python-3.xairflowairflow-scheduler

解决方案


对于气流 < 2.2.0:

只有当您可以在单个 cron 表达式中“说出来”时,您才能安排 DAG。如果您的调度愿望不适合 cron 表达式,那么您不能将其设置为开箱即用。但是,您可以找到一个与您想要的足够接近的 cron 表达式(0 0 25-31 * *- 从 28 到 31 的每一天),并将 aShortCircuitOperator放在 DAG 的开头,以验证日期是否实际上是 3 天前月底。如果日期匹配,它将继续执行下游任务,如果日期不匹配,它将跳过下游任务:

import calendar
from datetime import datetime, date, timedelta

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 8, 21)

}


def check_if_last_day_of_month(execution_date):
    #  calendar.monthrange return a tuple (weekday of first day of the
    #  month, number
    #  of days in month)
    run_date = datetime.fromtimestamp(execution_date.timestamp())
    last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
    # check if date is 3 days behind the last day of the month
    if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
        return True
    return False


with DAG(
    dag_id='short_example',
    schedule_interval="@once",
    default_args=default_args,
) as dag:
    first = ShortCircuitOperator(
        task_id='verify_date',
        python_callable=check_if_last_day_of_month
    )

    second = DummyOperator(task_id='task')

    first >> second

示例运行2021-01-30

在此处输入图像描述

示例运行2021-01-28

在此处输入图像描述

注意:确保您正在比较您感兴趣的日期。在示例中,我比较execution_date了 DAG 的。

对于 Airflow >= 2.2.0:(当前测试版 2.2.0b2)

AIP-39 更丰富的 scheduler_interval可用。您可以为调度定义自己的时间表。您可以在PR中阅读此功能的文档。


推荐阅读