airflow - Apache 气流宏获取最后 dag 运行执行时间
问题描述
我认为这里prev_execution_date
列出的宏会让我知道最后一次 DAG 运行的执行日期,但是查看源代码似乎只能根据 DAG 时间表获得最后日期。
prev_execution_date = task.dag.previous_schedule(self.execution_date)
当 DAG 未按计划运行时,是否有任何方法可以通过宏获取 DAG 的执行日期?
解决方案
是的,您可以为此定义自己的自定义宏,如下所示:
# custom macro function
def get_last_dag_run(dag):
last_dag_run = dag.get_last_dagrun()
if last_dag_run is None:
return "no prev run"
else:
return last_dag_run.execution_date.strftime("%Y-%m-%d")
# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
schedule_interval='@daily',
user_defined_macros={
'last_dag_run_execution_date': get_last_dag_run
}
)
# example of using it in practice
print_vals = BashOperator(
task_id='print_vals',
bash_command='echo {{ last_dag_run_execution_date(dag) }}',
dag=dag
)
请注意,dag.get_last_run() 只是 Dag 对象上可用的众多函数之一。这是我找到它的地方:https ://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396
您还可以调整日期格式的字符串格式,以及如果没有以前的运行,您想要输出的内容。
推荐阅读
- javascript - 触发动态和相关选择选项的更改
- python - matplotlib 在主图中缩放绘图窗口?
- python - Plotly:使用 plotly express 行时如何在 hoverinfo 中格式化日期?
- php - 有时用户如何能够上传 png 格式?
- windows - Windows 2000 cdplayer.exe 对光盘元数据的响应格式是什么?
- swift - AudioKit Playground 构建失败
- flutter - Flutter 中适合游戏的 Widget 和/或布局
- haskell - Haskell 替换现有列表中的项目
- python - python numpy:以不同的值滚动列
- sql - 基于数量值的重复行