首页 > 解决方案 > 我能否以编程方式确定 Airflow DAG 是计划触发还是手动触发?

问题描述

我想创建一个片段,根据 DAG 是否已安排或是否手动触发来传递正确的日期。DAG 每月运行一次。DAG 根据上个月的数据生成报告(A SQL 查询)。

如果我按计划运行 DAG,我可以使用以下 jinja 代码段获取上个月的数据:

execution_date.month

鉴于 DAG 安排在上一期(上个月)结束时, execution_date 将正确返回上个月。但是在手动运行时,这将返回当前月份(执行日期将是手动触发的日期)。

我想写一个简单的宏来处理这种情况。但是,我找不到以编程方式查询 DAG 是否以编程方式触发的好方法。我能想到的最好办法是run_id从数据库中获取 (通过创建一个具有数据库会话的宏),检查是否run_id包含单词manual. 有没有更好的方法来解决这个问题?

标签: airflow

解决方案


tl;博士:你可以用DagRun.external_trigger.


我注意到在树视图中,有一个计划运行的大纲,但不是手动运行。那是因为后者已stroke-opacity: 0;应用于 CSS。

在 repo 中搜索这个,我发现Airflow 开发人员如何检测手动运行(5 岁的线,所以也应该在旧版本中工作):

.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})

搜索external_trigger将我们带到DagRun定义

因此,例如,如果您使用的是 Python 回调,则可以有这样的东西(可以在 DAG 中定义,也可以在单独的文件中定义):

def my_fun(context):
    if context.get('dag_run').external_trigger:
        print('manual run')
    else:
        print('scheduled run')

在您的Operator设置中,参数如下:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    on_failure_callback=my_fun,
    dag=dag,
)

我已经测试过类似的东西并且它有效。

我认为你也可以做类似 if if {{ dag_run.external_trigger }}:- 但我没有测试过这个,我相信它只会在那个 DAG 的文件中工作。


推荐阅读