首页 > 解决方案 > 有没有一种简单的方法可以在 Airflow 中跳过整个 DAG 运行但仍允许未来运行?

问题描述

我目前有一个每天中午运行的 DAG。我现在希望它在每个星期五和每个月的 10 号运行,以节省一些成本,但仍然安排它每天运行,以便我可以轻松地在任何一天返回并运行 dag。(这背后的原因是有时我需要这个 dag 在特定日期而不是 10 号和每个星期五产生的数据,但我不会提前知道何时需要)。

我的一个想法是使用 BranchPythonOperator 检查是否满足条件,如果满足将触发所有需要启动的任务,否则它会触发一个虚拟任务并跳过其他所有任务。这样做的主要问题是这个 DAG 非常庞大并且包含分布在不同文件中的许多任务,所以我正在努力确保一切都依赖于这个 BranchPythonOperator 任务。(关于这个的一个问题,如果我只是让 BranchPythonOperator 独立,不依赖或依赖它,返回任务的名称是否仍然有效?)

我认为必须有一种更简单的方法来跳过整个 DAG 运行,如果不满足条件而不阻止我将来在该日期运行。

用示例编辑:

partition_sensor_1 -->
partition_sensor_2 -->
.                         first_dag_task
.
.
partition_sensor_15 -->

所以在这个例子中,我想确保跳过所有这些分区传感器和 DAG 的其余部分。我是否必须将每个分区传感器列为依赖于 Branch python 运算符?这些分区传感器的设置方式通常是在不同的文件中创建的,因此根据 Branching 运算符来设置它们并不容易。希望这是有道理的

标签: pythondirected-acyclic-graphsairflow-schedulerairflow

解决方案


我认为你的想法是正确的。但是,您需要在 DAG 的第一个任务中连接 BranchPythonOperator。

所以现在你有:

first_task >> of >> your >> old >> dag >> definition

您首先要定义您的 BranchPythonOperator,然后按如下方式更改 dag:

branch_task >> [dummy_task, first_task]
first_task >> of >> your >> old >> dag >> definition

确保 branch_task 在满足条件时返回 first_task 的 task_id,否则返回虚拟任务的任务 id。


推荐阅读