google-cloud-platform - 气流调度,重新运行失败的任务
问题描述
我对气流相当陌生,尽管我已经能够为读取(数据存储)和写入(大查询)编写代码。我无法安排我的工作/任务。我想每 2 小时运行一次作业,从源中读取前 2 小时的数据。现在,如果任务失败,我想手动重试它,但是对于它本来要运行的特定 2 小时。我如何做到这一点?我有几件事:
- 存储计划运行的作业 ID 和时间范围。在重试时,我可以从 sqlite 或其他数据库中读取它。
- Airflow 有一个内置变量,它指向该作业的执行时间,我可以在我的代码中使用它。
我应该考虑其他选择吗?或以上任何一项?
解决方案
我假设您在任务中使用当前日期时间(例如 now()),对吗?
好的做法是使用execution_date
Airflow Context 中的值,而不是在您的操作员中调用 datetime.now(),因为execution_date
即使您重新执行 DAG/任务,计划作业的 也不会更改。