首页 > 解决方案 > 如何在气流中的 SLA 上设置时间对象而不是 timedelta?

问题描述

我正在尝试在我的气流 DAG 中实施 SLA。

我知道 SLA 是如何工作的,你设置了一个 timedelta 对象,如果任务在这段时间内没有完成,它会发送一封电子邮件并通知任务尚未完成。

我想要一些类似的功能,但我不想给出持续时间,而是想在 SLA 中设置特定时间。例如,如果任务由于上午 8:00 未完成,它会发送电子邮件并通知经理。像这样的东西:

'sla': time(hour=8, minute=0, second=0)

我搜索了很多,但没有找到。

有没有针对这个特定问题的解决方案?或除 SLA 之外的任何其他解决方案?

提前致谢。

标签: pythonpython-3.xairflowairflow-scheduler

解决方案


SLAparam ofBaseOperator需要一个datetime.timedelta对象,所以那里没有什么可做的。考虑到SLA表示计划周期结束后的时间增量。文档中的示例假设每天安排一个 DAG:

例如,如果您将 SLA 设置为 1 小时,如果 2016-01-01 实例尚未成功,调度程序将在 2016-01-02 凌晨 1:00 后不久发送电子邮件。

关键是,它始终是计划周期的时间增量,这不是您要寻找的。

所以我认为你应该采取另一种方法,比如在你需要的时候安排你的 DAG,执行你想要的任务,然后添加一个传感器操作符来检查你正在寻找的条件是否得到满足。有几种类型的传感器,具体取决于您可以从中选择的上下文。

另一种选择可能是,创建一个新的DAG ,专门用于检查您在原始DAG中执行的任务是否成功执行,并采取相应措施(例如,发送电子邮件等)。为此,您可以使用ExternalTaskSensor,在线查看有关如何实现它的教程,尽管避免文档中所述的交叉 DAG 依赖关系可能更简单。

希望这可以为您指明正确的方向。


推荐阅读