首页 > 解决方案 > Airflow 如何解析和存储 schedule_interval

问题描述

我正在开发一个需要 Airflow 作业的 schedule_intervals 的功能。我没有自己编写代码来解析 DAG 文件中的 cron 表达式,而是尝试在 Airflow 元数据数据库中查找解析的 schedule_interval 值,但无济于事。

有人可以给我一个指向 Airflow 如何解析 schedule_interval 表达式的指针(例如https://github.com/apache/incubator-airflow上的文件),以及它存储解析值的位置(如果存储值)?

编辑:

  1. 上面的 schedule_interval 表达式是 DAG 参数 schedule_interval,如下所示:

    dag = DAG('tutorial', default_args=default_args, schedule_interval='@daily')

根据这个文档页面,schedule_interval 可以是一个 cron 表达式、一个 datetime.timedelta 对象或像“@daily”这样的“预设”之一。因为 schedule_interval 可以采用多种形式,如果 Airflow 已经解析并存储了这些值,我不想重新发明轮子并编写代码来解析 schedule_interval 参数。

  1. 我正在构建一个系统,通过查询 Airflow 元数据数据库来定期检查所有 Airflow 作业并总结它们的状态。虽然不是绝对必要,但了解 schedule_interval 会很有用,因为它揭示了诸如每个 Airflow 作业、过去 24 小时内预计运行多少 dag 以及下一次 dag 运行时间等信息。

标签: airflow

解决方案


schedule_interval值不存储在任何地方,而是过程本身。气流通过或多或少地检查来确定何时创建新的运行,NOW() >= (MAX(execution_date, start_date)) + schedule_interval

如果您愿意,可以使用和方法以编程方式计算 Airflow 的execution_date值。airflow.models.DAG.following_scheduleairflow.models.DAG.previous_schedule

注意:Airflow 使用croniter包来计算关闭 cron 值。


推荐阅读