airflow - Airflow 如何解析和存储 schedule_interval
问题描述
我正在开发一个需要 Airflow 作业的 schedule_intervals 的功能。我没有自己编写代码来解析 DAG 文件中的 cron 表达式,而是尝试在 Airflow 元数据数据库中查找解析的 schedule_interval 值,但无济于事。
有人可以给我一个指向 Airflow 如何解析 schedule_interval 表达式的指针(例如https://github.com/apache/incubator-airflow上的文件),以及它存储解析值的位置(如果存储值)?
编辑:
- 上面的 schedule_interval 表达式是 DAG 参数 schedule_interval,如下所示:
dag = DAG('tutorial', default_args=default_args, schedule_interval='@daily')
根据这个文档页面,schedule_interval 可以是一个 cron 表达式、一个 datetime.timedelta 对象或像“@daily”这样的“预设”之一。因为 schedule_interval 可以采用多种形式,如果 Airflow 已经解析并存储了这些值,我不想重新发明轮子并编写代码来解析 schedule_interval 参数。
- 我正在构建一个系统,通过查询 Airflow 元数据数据库来定期检查所有 Airflow 作业并总结它们的状态。虽然不是绝对必要,但了解 schedule_interval 会很有用,因为它揭示了诸如每个 Airflow 作业、过去 24 小时内预计运行多少 dag 以及下一次 dag 运行时间等信息。
解决方案
该schedule_interval
值不存储在任何地方,而是过程本身。气流通过或多或少地检查来确定何时创建新的运行,NOW() >= (MAX(execution_date, start_date)) + schedule_interval
如果您愿意,可以使用和方法以编程方式计算 Airflow 的execution_date
值。airflow.models.DAG.following_schedule
airflow.models.DAG.previous_schedule
注意:Airflow 使用croniter
包来计算关闭 cron 值。
推荐阅读
- android - 如何更改 Android Project 中的所有文本文件?
- python - ET 中的评论:不仅仅是评论标签
- python - Python rarfile 包:BadRarFile
- arrays - Dictionary(grouping: , by: ) - 作为变量传递的谓词闭包
- python - 基于值的彩色条形图
- symfony - symfony liip 想象水印不工作
- mongodb - 使用 insertMany 命令恢复 bson 文档
- javascript - 是否可以使用 sequelize 在迁移中更新 postgresql 数据库中的现有数据
- vscode-settings - 如何隐藏在 Vs-code 中编译“c”或“c++”语言代码后生成的没有扩展名的文件
- c# - sql select语句在asp.net中返回空引用错误