python - 在任务成功/失败时重新安排 DAG
问题描述
考虑一个非常简单的 Apache Airflow DAG:
FileSensor -> PythonOperator
在哪里FileSensor
等待一些文件出现(相对较短poke_interval
)并PythonOperator
处理这些文件。此 DAG 计划@once
无限期运行 - 我如何将其设置为重新安排以PythonOperator
在它成功(或失败)之后再次运行?
解决方案
总的来说,我认为 Elad 的建议可能会奏效,但我认为这是一种不好的做法。DAG 在设计(和名称)上是非循环的,因此在其中创建任何类型的循环都可能导致其行为异常。
同样基于 Airflow 文档,如果您打算使用外部 dag 触发器,则应将 dag 计划设置为 None。就我个人而言,我不确定它是否一定会破坏某些东西,但它肯定会给你带来你意想不到的输出。如果出现问题,您稍后可能会花费更长的时间来调试它。
恕我直言,更好的方法是让您尝试并重新考虑您的设计。如果您需要在失败时重新安排 dag,您可以利用传感器的重新安排模式https://www.astronomer.io/guides/what-is-a-sensor。不知道为什么要在成功时重新运行它,如果源中有多个文件的情况,我会说宁愿在你的 dag 脚本中创建具有可变参数和 for 循环的多个传感器。
推荐阅读
- r - 我想将数据框中的多个列与向量源隔离,并使用 R 将它们的数据类型从字符转换为数字
- mysql - 使用大数据更快地执行此查询的任何方法
- python-3.x - Djnago 模板 - 具有多个参数的 URL
- error-handling - 方案:为什么我会收到此错误:断言违规:错误类型的参数 [car (car #{Unspecific})
- c# - 异步更新多行立即更新1/4的行然后等待
- python - 在 Django 中,我的表单每次刷新时都会重新提交,我知道我可以通过重定向它来解决这个问题,但我也想将数据传递给我的模板
- python - 树视图中的行选择
- java - 将 JFrame 大小动态更改为 JTable 内容
- php - 多个mysqli对象互相覆盖
- python - 如何对齐两个 dask 数据帧的分区