airflow - 我能否以编程方式确定 Airflow DAG 是计划触发还是手动触发?
问题描述
我想创建一个片段,根据 DAG 是否已安排或是否手动触发来传递正确的日期。DAG 每月运行一次。DAG 根据上个月的数据生成报告(A SQL 查询)。
如果我按计划运行 DAG,我可以使用以下 jinja 代码段获取上个月的数据:
execution_date.month
鉴于 DAG 安排在上一期(上个月)结束时, execution_date 将正确返回上个月。但是在手动运行时,这将返回当前月份(执行日期将是手动触发的日期)。
我想写一个简单的宏来处理这种情况。但是,我找不到以编程方式查询 DAG 是否以编程方式触发的好方法。我能想到的最好办法是run_id
从数据库中获取 (通过创建一个具有数据库会话的宏),检查是否run_id
包含单词manual
. 有没有更好的方法来解决这个问题?
解决方案
tl;博士:你可以用DagRun.external_trigger
.
我注意到在树视图中,有一个计划运行的大纲,但不是手动运行。那是因为后者已stroke-opacity: 0;
应用于 CSS。
在 repo 中搜索这个,我发现Airflow 开发人员如何检测手动运行(5 岁的线,所以也应该在旧版本中工作):
.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
搜索external_trigger
将我们带到DagRun
定义。
因此,例如,如果您使用的是 Python 回调,则可以有这样的东西(可以在 DAG 中定义,也可以在单独的文件中定义):
def my_fun(context):
if context.get('dag_run').external_trigger:
print('manual run')
else:
print('scheduled run')
在您的Operator
设置中,参数如下:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
on_failure_callback=my_fun,
dag=dag,
)
我已经测试过类似的东西并且它有效。
我认为你也可以做类似 if if {{ dag_run.external_trigger }}:
- 但我没有测试过这个,我相信它只会在那个 DAG 的文件中工作。
推荐阅读
- json - swift json帮助在表格视图控制器上显示数据
- r - 使用 raster 和 sp 库在 R 中将坐标的 data.frame 从 Lambert93 重新投影到 WGS83
- google-api - GKE 集群 API 中区域和位置端点之间的区别
- python - Django send_mail 引发错误“‘ascii’编解码器无法编码字符”
- javascript - 出现错误:`ReferenceError: Cannot access 'imported const' before initialization` in react
- c++ - 如何获取向量c ++的列表
- html - Html 基础 - 在同一行与不同行编写的控件
- lua - Lua 回调的返回值
- typescript - TypeScript 类中的 # 符号是什么意思?
- c++ - 比较 std::ostream 是否为 std::cout ("no match for 'operator=='")