首页 > 解决方案 > 根据条件重新运行某些成功的气流任务

问题描述

例如,我有一个带有任务 A、B 和 C 的 Dag。目前 C 依赖于 A 和 B。问题是,我正在尝试找出一种方法来 1)在 A 或 B 成功后运行 C,这样C 中的某些列将被刷新。2)说A先完成,C执行一次成功。现在B完成了。我现在如何触发 C 再次运行?这是让 c 运行两次的理想行为。C 可能比 A 和 B 有 2 个以上的依赖项。什么是增量更新 C 并在满足更多条件后重新运行的方法?作业 A、B 和 C 是 Spark 作业或 Hive sql。非常感谢您的帮助。

标签: pythonpysparkhiveairflowairflow-scheduler

解决方案


  1. 在 A 或 B 成功后运行 C,以便刷新 C 中的某些列。2)说A先完成,C执行一次成功。现在B完成了。我现在如何触发 A 再次运行?

答案在触发规则中。我会考虑all_done在 C 任务中使用规则。这样,它将始终在 A 和 B 之后运行(无论结果如何)。但是,如果所有任务都失败了,这可能需要调整任务 C 以使其正常工作。

我现在如何触发 A 再次运行?C 可能比 A 和 B 有 2 个以上的依赖项。什么是增量更新 C 并在满足更多条件后重新运行的方法?

为此,从 Airflow Web UI:

  1. 将任务 C 标记为失败
  2. 使用选项“上游”和“失败”清除任务 C:

在此处输入图像描述

这应该重新运行所有失败的任务(A 或 B 或 C 上游的任何其他任务)以及 C(因为我们将其标记为失败)。


推荐阅读