python - 有没有一种简单的方法可以在 Airflow 中跳过整个 DAG 运行但仍允许未来运行?
问题描述
我目前有一个每天中午运行的 DAG。我现在希望它在每个星期五和每个月的 10 号运行,以节省一些成本,但仍然安排它每天运行,以便我可以轻松地在任何一天返回并运行 dag。(这背后的原因是有时我需要这个 dag 在特定日期而不是 10 号和每个星期五产生的数据,但我不会提前知道何时需要)。
我的一个想法是使用 BranchPythonOperator 检查是否满足条件,如果满足将触发所有需要启动的任务,否则它会触发一个虚拟任务并跳过其他所有任务。这样做的主要问题是这个 DAG 非常庞大并且包含分布在不同文件中的许多任务,所以我正在努力确保一切都依赖于这个 BranchPythonOperator 任务。(关于这个的一个问题,如果我只是让 BranchPythonOperator 独立,不依赖或依赖它,返回任务的名称是否仍然有效?)
我认为必须有一种更简单的方法来跳过整个 DAG 运行,如果不满足条件而不阻止我将来在该日期运行。
用示例编辑:
partition_sensor_1 -->
partition_sensor_2 -->
. first_dag_task
.
.
partition_sensor_15 -->
所以在这个例子中,我想确保跳过所有这些分区传感器和 DAG 的其余部分。我是否必须将每个分区传感器列为依赖于 Branch python 运算符?这些分区传感器的设置方式通常是在不同的文件中创建的,因此根据 Branching 运算符来设置它们并不容易。希望这是有道理的
解决方案
我认为你的想法是正确的。但是,您需要在 DAG 的第一个任务中连接 BranchPythonOperator。
所以现在你有:
first_task >> of >> your >> old >> dag >> definition
您首先要定义您的 BranchPythonOperator,然后按如下方式更改 dag:
branch_task >> [dummy_task, first_task]
first_task >> of >> your >> old >> dag >> definition
确保 branch_task 在满足条件时返回 first_task 的 task_id,否则返回虚拟任务的任务 id。
推荐阅读
- mysql - 是否可以将应用程序内的所有数据库交互限制为读取?
- python - 使用 BCP 将 CSV 文件导入 SQL Server 表时如何格式化日期字段?
- ios - iOS13 崩溃-[UIWindowScene _enumerateWindowsIncludingInternalWindows:onlyVisibleWindows:asCopy:stopped:withBlock:]?
- javascript - 如何返回存储在 cookie 中的 URL
- css - 使用 Bash 脚本查找和替换 css 文件中的文件名
- java - 使用流而不是嵌套的 for 循环
- android - 我怎样才能使 ProgressBar 角直而不圆?
- html - 使用 Thymeleaf 和 Spring boot 从下拉列表中提取所选值
- python - 如何比较不在另一个二维列表中的二维列表?
- pandas - 熊猫掩码数据框分配二维数组