首页 > 解决方案 > Airflow 中的延迟通知系统

问题描述

当 DAG 或 Task Fails 或 Success 或 Delayed 时,我正在尝试实施气流警报/通知系统。我成功地实现了 FAILURE 和 SUCCESS 通知,并且还成功地使用 SLA 部分实现了延迟通知系统,但 SLA 的局限性在于它只能与计划的 DAG 一起使用,我们系统中的许多 DAG 都是基于触发器的。有没有办法在 Airflow 中实现没有 SLA 的延迟通知系统?提前致谢 !

标签: pythongoogle-cloud-platformairflowairflow-schedulergoogle-cloud-composer

解决方案


服务水平协议 (SLA) 提供了在任务超出其从 DAG 执行开始的预期时间范围的情况下发送电子邮件的功能,根据官方文档和这篇中篇文章,Airflow 目前不支持此功能触发的 DAG;在这种情况下,我建议您在Airflow Issue Tracker中打开一个改进问题以请求支持此功能。

另一方面,在 DAG 完成运行后,您可以使用以下方法来验证 SLA on_success_callback

def success_function (context):
   s_date = context.get('task_instance').start_date.replace(tzinfo = None)
   e_date = datetime.now().replace(tzinfo = None)
  
   execution_time = e_date - s_date
   sla = timedelta (seconds = 30)
 
   if execution_time> sla:
       print('send_email')
   else:
       print('no send email')

t1 = BashOperator (
   task_id = 'print_date',
   bash_command = 'date',
   dag = dag,
   on_success_callback = success_function
)

如果您实施此解决方法,请牢记以下注意事项:

  • 上下文不包含任务中配置的 SLA 信息;因此,有必要在函数中为要验证 SLA 的每个任务指定它
  • 错过的 SLA 不会出现在 Airflow UI 中
  • 有必要创建发送电子邮件的逻辑

推荐阅读