airflow - 气流任务是否可以发送上游失败的错误电子邮件通知?
问题描述
我正在使用气流来安排一些任务,并且我需要下游任务在上游任务失败时发送错误电子邮件通知。我使用下面的示例进行测试。
dag_alert_task_callback = DAG(
'dag_alert_task_callback',
default_args=default_args,
schedule_interval=None
)
t1 = PostgresOperator(task_id='create_schema',
sql="CREATE SCHEMA IF NOT EXISTS dbt_raw_data;",
postgres_conn_id='dbt_postgres_instance_raw_data',
autocommit=True,
database="dbtdb",
dag=dag_alert_task_callback)
t2 = PostgresOperator(task_id='drop_table_aisles',
sql="DROP TABLE IF EXISTS aisles;",
postgres_conn_id='dbt_postgres_instance_raw_data',
autocommit=True,
database="dbtdb",
dag=dag_alert_task_callback)
t5 = PostgresOperator(task_id='drop_table_test',
sql="DROP TABLE TEST;",
postgres_conn_id='dbt_postgres_instance_raw_data',
autocommit=True,
database="dbtdb",
dag=dag_alert_task_callback)
t3 = PostgresOperator(task_id='create_aisles',
sql="create table if not exists dbt_raw_data.aisles (aisle_id integer, aisle varchar(100) );",
postgres_conn_id='dbt_postgres_instance_raw_data',
autocommit=True,
database="dbtdb",
dag=dag_alert_task_callback,
email_on_failure=True)
t4 = PostgresOperator(task_id='load_aisles',
sql="COPY dbt_raw_data.aisles FROM '/sample_data/aisles.csv' DELIMITER ',' CSV HEADER;",
postgres_conn_id='dbt_postgres_instance_raw_data',
autocommit=True,
database="dbtdb",
dag=dag_alert_task_callback)
t1 >> [t2, t5] >> t3 >> t4
dag 图视图就像 [dag 执行图视图] [1]:https ://i.stack.imgur.com/Vk8wF.png 我测试运行 dag 并在 t5 中删除一个名为 TEST 的表,但是当 t5 是失败,下游任务 t3 无法发送错误通知邮件,无法满足我的需要。
解决方案
您必须为每个任务定义 on_failure_callback 以在任务失败的情况下执行此代码:
推荐阅读
- spss - 我可以避免在 SPSS 语法中硬编码文件位置吗?
- javascript - 登录表单可操作
- java - IBM MobileFirst 7.1 中生成密钥库的位置
- gradle - Gradle 与 cpp-application 插件,如何设置包含路径?
- vim - 如何使 vim 拼写错误仅搜索当前行?
- google-apps-script - 在谷歌表格中,我可以将标准函数包装在自定义函数中以控制它何时运行?
- python - 如果触发了某个标志,我想停止一个方法而不完成它的指令
- go - 使用 gofpdf 附加其他 pdf 文件
- python - 出现弹性搜索映射错误时如何查看显式字段名称
- react-native - 如何解决未定义的不是对象(评估'RCTToastAndroid.TOAST')