首页 > 解决方案 > DAG 是否可以检测到 Airflow 中特定日期的首次运行?

问题描述

我每 30 分钟运行一次 DAG。

假设这是 DAG(为简单起见使用虚拟运算符):

dag = DAG(
    dag_id='My_dag',
    default_args=args,
    schedule_interval=timedelta(minutes=30),
    max_active_runs=1,
    catchup=False,
)
start = DummyOperator(task_id='start_task', dag=dag)
to_do = DummyOperator(task_id='to_do_task ', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)

start >> to_do >> end

现在,每天一次,我想将另一个 Operator 添加到工作流中,以便仅在当天的第一次运行时执行。

说它是:

once = DummyOperator(task_id='once_task ', dag=dag)
start >> once

这意味着这once将每 24 小时执行一次,其余的应该被跳过。

我不能这样做,PythonBranchOperator因为我不能这样做:

if execution_date == midnigt

因为我无法知道第一次执行的时间。它可以是 00:01,也可以是 00:17 等。

有没有办法可以检查这是否是每个 execution_date 的第一次运行?我听起来像是TimeSensor 一种事情,但我找不到如何使用文档来做到这一点。是否可以戳相同的 DAG?

标签: pythonairflow

解决方案


您可以检查上一个执行日期(prev_ds宏)并将其与当前执行日期(ds宏)进行比较BranchPythonOperator。例子:

start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
once = DummyOperator(task_id='once_task', dag=dag)
dummy_task_id_that_does_nothing = DummyOperator(task_id='dummy_task_id_that_does_nothing', dag=dag)

def check_if_task_already_ran(**context):
    ds = context.get('ds')
    prev_ds = context.get('prev_ds')

    print(context)
    print(ds)
    print(prev_ds)

    if prev_ds == ds:
        return 'dummy_task_id_that_does_nothing' #task_id
    else:
        return 'once_task'    # Task that would just be executed once in a day


compare_ds = BranchPythonOperator(
    task_id='compare_ds',
    provide_context=True,
    python_callable=check_if_task_already_ran,
    dag=dag)


start >> compare_ds
compare_ds >> once >> end
compare_ds >> dummy_task_id_that_does_nothing >> end

推荐阅读