首页 > 解决方案 > Airflow - 从 dag 上下文回调中解析任务 ID

问题描述

起初使用dag callback(on_failure_callbackon_success_callback) 时,我认为它会在完成时触发successorfail状态dag(正如它在 dag 中定义的那样)。但是它似乎在 everytask instance而不是实例化dag run,所以如果一个 DAG 有 N 个任务,它会触发这些回调 N 次。

我正在尝试捕获任务 ID,然后发送给 slack。阅读另一个相关问题,我想出了以下内容:

def success_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

def failure_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

default_args = {
    [...]
    'on_failure_callback': failure_msg,
    'on_success_callback': success_msg,
    [...]
}

但它失败了,我应该如何解析上下文变量并允许获取任务 ID?

标签: airflow

解决方案


您可以从上下文中使用任务对象访问任务。

context['task']应该是执行此操作的适当方法。要获取任务名称,请使用task_id

context['task'].task_id

要查找上下文中可用的更多对象,您可以在此处浏览列表:https ://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html


推荐阅读