首页 > 解决方案 > Airflow - 根据假期日历在不同时间运行 dag

问题描述

我想根据假期日历在不同时间开始 Airflow 工作。要求如下。

  1. DAG 正常运行时间为下午 5 点,但节假日上午 9 点运行。周末根本不运行。
  2. 除了时间,工作是相同的
  3. 要确定是否是假期,我们将查询数据库。

schedule_interval用来指定开始时间,例如正常一天的“0 17 * * 1-5”。我的第一次尝试是schedule_interval根据日历查询的结果进行更改。像这样

with DAG(
    dag_id='jobname',
    default_args=args,
    schedule_interval='0 9 * * 1-5` if isholiday() else '0 17 * * 1-5'
    catchup=False,
) as dag:
    ## Make python operator to run my task

whereisholiday()获取当前日期,并检查数据库以查看是否是假期。

由于几个原因,这是不可取的。最大的原因是,当 DAG 本身被解析时,这段代码被调用,Airflow 在后台每隔一段时间重复执行该操作。我实际上有很多工作需要这样做——所以将它乘以工作数量,我每分钟要进行许多数据库查询。我们的 DBA 不会很高兴看到这么多相同的查询。我也没有简单的方法来缓存假期数据库查询的结果,因为每次 Airflow 解析 DAG 时,它都会在不同的进程中运行。

下一个明显的解决方案是为假期设置一个单独的 DAG。

 with DAG(
    dag_id='jobname-holiday',
    default_args=args,
    schedule_interval='0 9 * * 1-5'
    catchup=False,
) as dag:

该作业总是在上午 9 点被调用。在任务本身内部,我可以运行该isholiday()函数,如果这是正常的一天,我会跳过。当然,我仍然需要在下午 5 点运行正常的 DAG。我会做完全相反的检查isholiday()。所以如果是假期我会跳过

这行得通,但现在我基本上将 DAG 的数量乘以 2,因为我有一个在假期运行的平行作业世界。此外,这些“假期”工作每天都在运行,尽管大多数时候它们都失败了。这似乎是额外的 dagrun,额外的日志输出...... GUI 上的额外空间。额外的一切。

另一种解决方案是每天运行两次作业 - 一次在早期,一次在后期。在我的任务中,我会检查当前时间和日历。如果时间还早,今天是假期,我继续。或者,如果时间很晚,而今天是正常的一天,我会继续。其他一切都会导致跳过。

 with DAG(
    dag_id='jobname',
    default_args=args,
    schedule_interval='0 9,17 * * 1-5'
    catchup=False,
) as dag:


def task():
    if isholiday() and current_time_is('09:00'):
        print ('ok. holiday and in early run')
    elif not isholiday() and current_time_is('17:00'):
        print ('ok. regular day and normal run')
    else:
        print ('do not run')
        raise 

这行得通。但是,它导致 DAG 运行的数量翻了一番。在 Airflow GUI 中,我会看到两倍的列数,其中一半总是失败。

我想我不是唯一一个有这个问题的人。似乎我的解决方案都不是很好。过去其他人是如何解决的?

提前致谢。

标签: airflowairflow-scheduler

解决方案


我们正在 Airflow 2.2 中解决这个问题(可能会在〜月发布)。

您将有可能定义非常灵活的时间表:

https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval


推荐阅读