python - DAG 是否可以检测到 Airflow 中特定日期的首次运行?
问题描述
我每 30 分钟运行一次 DAG。
假设这是 DAG(为简单起见使用虚拟运算符):
dag = DAG(
dag_id='My_dag',
default_args=args,
schedule_interval=timedelta(minutes=30),
max_active_runs=1,
catchup=False,
)
start = DummyOperator(task_id='start_task', dag=dag)
to_do = DummyOperator(task_id='to_do_task ', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
start >> to_do >> end
现在,每天一次,我想将另一个 Operator 添加到工作流中,以便仅在当天的第一次运行时执行。
说它是:
once = DummyOperator(task_id='once_task ', dag=dag)
start >> once
这意味着这once
将每 24 小时执行一次,其余的应该被跳过。
我不能这样做,PythonBranchOperator
因为我不能这样做:
if execution_date == midnigt
因为我无法知道第一次执行的时间。它可以是 00:01,也可以是 00:17 等。
有没有办法可以检查这是否是每个 execution_date 的第一次运行?我听起来像是TimeSensor
一种事情,但我找不到如何使用文档来做到这一点。是否可以戳相同的 DAG?
解决方案
您可以检查上一个执行日期(prev_ds
宏)并将其与当前执行日期(ds
宏)进行比较BranchPythonOperator
。例子:
start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
once = DummyOperator(task_id='once_task', dag=dag)
dummy_task_id_that_does_nothing = DummyOperator(task_id='dummy_task_id_that_does_nothing', dag=dag)
def check_if_task_already_ran(**context):
ds = context.get('ds')
prev_ds = context.get('prev_ds')
print(context)
print(ds)
print(prev_ds)
if prev_ds == ds:
return 'dummy_task_id_that_does_nothing' #task_id
else:
return 'once_task' # Task that would just be executed once in a day
compare_ds = BranchPythonOperator(
task_id='compare_ds',
provide_context=True,
python_callable=check_if_task_already_ran,
dag=dag)
start >> compare_ds
compare_ds >> once >> end
compare_ds >> dummy_task_id_that_does_nothing >> end
推荐阅读
- c - 将指向数组的指针作为参数传递给函数
- reactjs - React JS 购物车添加功能
- javascript - 将路由动态导入 Vue 路由器
- python - 定时器在后台运行和调用文件
- vue.js - NUXTJS + Vuetify - SCSS 中的颜色
- java - Android Firebase 实时数据库 orderByChild 不起作用
- mysql - MySQL删除多个表上所有匹配记录的连接
- swift - “未能获得匹配的快照:评估 UI 查询时超时。” 在 XCUITest
- javascript - 如何使用棘轮通过单个 websocket 发送请求发送对象数组?
- python - 翻译 Python 字典中的数字代码值