首页 > 解决方案 > Apache Airflow DAG 不调用 on_success_callback 和 on_failure_callback

问题描述

我想自定义我的 DAG 以在电子邮件失败或成功时发送电子邮件。我正在尝试在 DAG 构造函数中使用 on_success_callback 和 on_failure_callback,但它不适用于 DAG。同时它适用于我放入 DAG 的 DummyOperator。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

from utils import get_report_operator, DagStatus


TEST_DAG_NAME='test_dag'
TEST_DAG_REPORT_SUBSCRIBERS = ['MY_EMAIL']

def send_success_report(context):
    subject = 'Airflow report: {0} run success'.format(TEST_DAG_NAME)
    email_operator = get_report_operator(subject, TEST_DAG_REPORT_SUBSCRIBERS, TEST_DAG_NAME, DagStatus.SUCCESS)
    email_operator.execute(context)

def send_failed_report(context):
    subject = 'Airflow report: {0} run failed'.format(TEST_DAG_NAME)
    email_operator = get_report_operator(subject, TEST_DAG_REPORT_SUBSCRIBERS, TEST_DAG_NAME, DagStatus.FAILED)
    email_operator.execute(context)


dag = DAG(dag_id=TEST_DAG_NAME,
          schedule_interval=None,
          start_date=datetime(2019,6,6),
          on_success_callback=send_success_report,
          on_failure_callback=send_failed_report)

DummyOperator(task_id='task',
              on_success_callback=send_success_report,
              on_failure_callback=send_failed_report,
              dag = dag)

我还在 Airflow EmailOperator 上实现了一些插件来发送报告。我不认为这部分有这个错误,但仍然如此。

class DagStatus(Enum):
    SUCCESS = 0
    FAILED = 1


def get_report_operator(sbjct, to_lst, dag_id, dag_status):
    status = 'SUCCESS' if dag_status == DagStatus.SUCCESS else 'FAILED'
    status_color = '#87C540' if dag_status == DagStatus.SUCCESS else '#FF1717'
    with open(os.path.join(os.path.dirname(__file__), 'airflow_report.html'), 'r', encoding='utf-8') as report_file:
        report_mask = report_file.read()
    report_text = report_mask.format(dag_id, status, status_color)
    tmp_dag = DAG(dag_id='tmp_dag', start_date=datetime(year=2019, month=9, day=12), schedule_interval=None)
    return EmailOperator(task_id='send_email',
                    to=to_lst,
                    subject=sbjct,
                    html_content=report_text.encode('utf-8'),
                    dag = tmp_dag)

我做错了什么?

标签: pythoncallbackairflow

解决方案


而是将on_failure_callback其作为参数放入default_args字典并将其传递给 DAG。传递给 DAG 的所有参数defaut_args都将应用于 DAG 的所有运算符。到目前为止,这是将公共参数应用于 DAG 中所有运算符的唯一方法。

dag = DAG(dag_id=TEST_DAG_NAME,      
          schedule_interval=None,
          start_date=datetime(2019,6,6),
          default_args={
              'on_success_callback': send_success_report,
              'on_failure_callback': send_failed_report
          })

推荐阅读