python - Apache Airflow DAG 不调用 on_success_callback 和 on_failure_callback
问题描述
我想自定义我的 DAG 以在电子邮件失败或成功时发送电子邮件。我正在尝试在 DAG 构造函数中使用 on_success_callback 和 on_failure_callback,但它不适用于 DAG。同时它适用于我放入 DAG 的 DummyOperator。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from utils import get_report_operator, DagStatus
TEST_DAG_NAME='test_dag'
TEST_DAG_REPORT_SUBSCRIBERS = ['MY_EMAIL']
def send_success_report(context):
subject = 'Airflow report: {0} run success'.format(TEST_DAG_NAME)
email_operator = get_report_operator(subject, TEST_DAG_REPORT_SUBSCRIBERS, TEST_DAG_NAME, DagStatus.SUCCESS)
email_operator.execute(context)
def send_failed_report(context):
subject = 'Airflow report: {0} run failed'.format(TEST_DAG_NAME)
email_operator = get_report_operator(subject, TEST_DAG_REPORT_SUBSCRIBERS, TEST_DAG_NAME, DagStatus.FAILED)
email_operator.execute(context)
dag = DAG(dag_id=TEST_DAG_NAME,
schedule_interval=None,
start_date=datetime(2019,6,6),
on_success_callback=send_success_report,
on_failure_callback=send_failed_report)
DummyOperator(task_id='task',
on_success_callback=send_success_report,
on_failure_callback=send_failed_report,
dag = dag)
我还在 Airflow EmailOperator 上实现了一些插件来发送报告。我不认为这部分有这个错误,但仍然如此。
class DagStatus(Enum):
SUCCESS = 0
FAILED = 1
def get_report_operator(sbjct, to_lst, dag_id, dag_status):
status = 'SUCCESS' if dag_status == DagStatus.SUCCESS else 'FAILED'
status_color = '#87C540' if dag_status == DagStatus.SUCCESS else '#FF1717'
with open(os.path.join(os.path.dirname(__file__), 'airflow_report.html'), 'r', encoding='utf-8') as report_file:
report_mask = report_file.read()
report_text = report_mask.format(dag_id, status, status_color)
tmp_dag = DAG(dag_id='tmp_dag', start_date=datetime(year=2019, month=9, day=12), schedule_interval=None)
return EmailOperator(task_id='send_email',
to=to_lst,
subject=sbjct,
html_content=report_text.encode('utf-8'),
dag = tmp_dag)
我做错了什么?
解决方案
而是将on_failure_callback
其作为参数放入default_args
字典并将其传递给 DAG。传递给 DAG 的所有参数defaut_args
都将应用于 DAG 的所有运算符。到目前为止,这是将公共参数应用于 DAG 中所有运算符的唯一方法。
dag = DAG(dag_id=TEST_DAG_NAME,
schedule_interval=None,
start_date=datetime(2019,6,6),
default_args={
'on_success_callback': send_success_report,
'on_failure_callback': send_failed_report
})
推荐阅读
- ios - 我们如何注册一个多平台应用程序以在 iOS 14 中接收通知?
- c++ - Opencv 恢复到比我设置的分辨率更高的分辨率
- php - 哎呀 PHP登录检查散列密码
- python - 我正在尝试通过根据 2 列对其进行排序来组织我的 csv 数据
- c++ - 如何使用 CImg 打开图像?
- php - 带有两个下拉菜单的 PHP 表单未发送
- amazon-web-services - 如何使用 S3FS 挂载 AWS s3 以允许任何用户完全访问
- typescript - typeRoots tsconfig.json 为 node_modules/@types 设置,但尽管安装了所有关联的 @types,但 vs 代码仍然无法识别
- php - 基于选择调用jQuery的下拉菜单
- sql - 在 Oracle 中准确计算经过的年数,考虑闰年