首页 > 解决方案 > 获取最近一次成功执行 DAG 的日期

问题描述

我希望在 Airflow 中创建一个转换,并且我想确保从我的源中获取自上次运行 DAG 以更新我的目标表以来的所有数据。为此,我希望能够获得最近一次成功的执行。

我发现了这一点:Apache 气流宏获取最后一个 dag 运行执行时间,这让我到达最终目标的某个地方,但是,这只会得到最后一次 DAG 执行,无论它是否成功。

SELECT col1, col2, col3
FROM schema.table
WHERE table.updated_at > '{{ last_dag_run_execution_date(dag) }}';

如果执行失败(由于连接或类似原因),last_dag_run_execution_date(dag) 将更新,但我们错过了前一次 DAG 运行的执行。

理想情况下,这将拉动最近的非失败执行。或者,如果有人有任何想法,我该如何满足这一点,请告诉我

标签: airflow

解决方案


我最终将引用问题中的函数更改为使用 latest_execution_date,这是 Airflow 中的预定义宏,如下所示:

def get_last_dag_run(dag):
    last_dag_run = dag.latest_execution_date
    if last_dag_run is None: 
        return '2013-01-01'
    else:
        return last_dag_run

目前似乎正在为我工​​作。


推荐阅读