python - Airflow - 处理 DAG 回调的正确方法
问题描述
我有一个DAG
然后每当它成功或失败时,我希望它触发一个发布到 Slack 的方法。
我DAG args
的如下:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
以及DAG
定义本身:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
但是当我检查 Slack 时,每分钟有超过 100 条消息,好像正在评估每个调度程序的心跳,并且对于每个日志,它确实运行了成功和失败方法,就好像它对同一个任务实例有效并且不适用于同一个任务实例(不是美好的)。
我应该如何正确使用on_failure_callback
andon_success_callback
来处理 dags 状态并调用自定义方法?
解决方案
它创建消息的原因是因为当您定义您的 时default_args
,您正在执行函数。您只需传递函数定义而不执行它。
由于该函数有一个参数,它会变得有点棘手。您可以定义两个部分函数或定义两个包装函数。
所以你可以这样做:
from functools import partial
success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
或者
def success_msg():
slack.slack_message(happy_message);
def failure_msg():
slack.slack_message(sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
在任何一种方法中,请注意函数定义是如何传递的,failure_msg
而success_msg
不是它们在执行时给出的结果。
推荐阅读
- c# - 将选项卡控件的 SelectedItem 与 MVVM 绑定是个好主意吗?
- c# - Task.Run() 与异步/等待
- sql - 删除 SQL Server 中的重复行
- javascript - Race observable 下的 distinctUntilChanged 未按预期工作
- r - 按组和年份将最大累积和合并到新表中
- tensorflow - “添加可见gpu设备:0..”一直在nohup.out中输出
- python - Async def on_ready():需要 Discord bot Python 故障排除
- spring - 升级到 Spring 和 Spring Security 5.2.0 后的 Mongo Authentication 错误
- angular-cli - 错误:EPERM:不允许操作,读取
- ruby-on-rails - 将 Rails 6 应用程序部署到 Elastic Beanstalk 时出现 Bundler 错误