首页 > 解决方案 > 如何在 sla_miss_callback 函数中获取上下文对象

问题描述

我能够在 Apache Airflow 中成功实现和测试on_success_callbackon_failure_callback,包括使用上下文对象成功地将参数传递给它们。但是我无法成功实施sla_miss_callback。通过浏览不同的在线资源,我发现传递给这个函数的参数是

dag,task_list,blocking_task_list,slas,blocking_tis

但是,与成功/失败回调不同的sla_miss_callback不会在其参数列表中获取上下文对象,如果我尝试运行多组运算符,如 Python,Bash 运算符它们会失败,并且调度程序会抱怨没有将上下文传递给执行函数。

SLA MISS CALLBACK 函数的调度程序错误

我尝试查看其他在线资源,并且仅在一个 ( https://www.rea-group.com/blog/watching-the-watcher/ ) 中发现我们可以使用 self 对象提取上下文对象。所以我将 self 附加到上述额外的 5 个参数中,但它对我不起作用。我想知道如何检索上下文对象或将上下文对象传递给sla_miss_callback函数,不仅可以运行不同的运算符,还可以检索有关错过 SLA 的 dag 的其他详细信息

标签: pythonairflowairflow-scheduler

解决方案


似乎不可能将上下文字典传递给 SLA 回调(请参阅sla_miss_callback的源代码),但我找到了一种合理的解决方法来访问有关 dag-run 的一些其他信息,例如 dag_id、task_id 和 execution_date。您还可以使用任何应该可以正常工作的内置宏/参数。当我将SlackWebhookOperator用于我的其他回调时,我正在使用SlackWebhookHooksla_miss_callback。例如:

from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis, *args, **kwargs):
dag_id = slas[0].dag_id
task_id = slas[0].task_id
execution_date = slas[0].execution_date.isoformat()
hook = SlackWebhookHook(
    http_conn_id='slack_callbacks',
    webhook_token=BaseHook.get_connection('slack_callbacks').password,
    message=f"""
        :sos: *SLA has been missed*
        *Task:* {task_id}
        *DAG:* {dag_id}
        *Execution Date:* {execution_date}
        """
)
hook.execute()

推荐阅读