首页 > 解决方案 > Airflow - 处理 DAG 回调的正确方法

问题描述

我有一个DAG然后每当它成功或失败时,我希望它触发一个发布到 Slack 的方法。

DAG args的如下:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

以及DAG定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

但是当我检查 Slack 时,每分钟有超过 100 条消息,好像正在评估每个调度程序的心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它对同一个任务实例有效并且不适用于同一个任务实例(不是美好的)。

我应该如何正确使用on_failure_callbackandon_success_callback来处理 dags 状态并调用自定义方法?

标签: pythonairflowairflow-scheduler

解决方案


它创建消息的原因是因为当您定义您的 时default_args,您正在执行函数。您只需传递函数定义而不执行它。

由于该函数有一个参数,它会变得有点棘手。您可以定义两个部分函数或定义两个包装函数。

所以你可以这样做:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

或者

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

在任何一种方法中,请注意函数定义是如何传递的,failure_msgsuccess_msg不是它们在执行时给出的结果。


推荐阅读