首页 > 解决方案 > 气流任务是否可以发送上游失败的错误电子邮件通知?

问题描述

我正在使用气流来安排一些任务,并且我需要下游任务在上游任务失败时发送错误电子邮件通知。我使用下面的示例进行测试。

dag_alert_task_callback = DAG(
    'dag_alert_task_callback',
    default_args=default_args,
    schedule_interval=None
)

t1 = PostgresOperator(task_id='create_schema',
                      sql="CREATE SCHEMA IF NOT EXISTS dbt_raw_data;",
                      postgres_conn_id='dbt_postgres_instance_raw_data',
                      autocommit=True,
                      database="dbtdb",
                      dag=dag_alert_task_callback)

t2 = PostgresOperator(task_id='drop_table_aisles',
                      sql="DROP TABLE IF EXISTS aisles;",
                      postgres_conn_id='dbt_postgres_instance_raw_data',
                      autocommit=True,
                      database="dbtdb",
                      dag=dag_alert_task_callback)

t5 = PostgresOperator(task_id='drop_table_test',
                      sql="DROP TABLE TEST;",
                      postgres_conn_id='dbt_postgres_instance_raw_data',
                      autocommit=True,
                      database="dbtdb",
                      dag=dag_alert_task_callback)

t3 = PostgresOperator(task_id='create_aisles',
                      sql="create table if not exists dbt_raw_data.aisles (aisle_id integer, aisle varchar(100) );",
                      postgres_conn_id='dbt_postgres_instance_raw_data',
                      autocommit=True,
                      database="dbtdb",
                      dag=dag_alert_task_callback,
                      email_on_failure=True)

t4 = PostgresOperator(task_id='load_aisles',
                      sql="COPY dbt_raw_data.aisles FROM '/sample_data/aisles.csv' DELIMITER ',' CSV HEADER;",
                      postgres_conn_id='dbt_postgres_instance_raw_data',
                      autocommit=True,
                      database="dbtdb",
                      dag=dag_alert_task_callback)

t1 >> [t2, t5] >> t3 >> t4

dag 图视图就像 [dag 执行图视图] [1]:https ://i.stack.imgur.com/Vk8wF.png 我测试运行 dag 并在 t5 中删除一个名为 TEST 的表,但是当 t5 是失败,下游任务 t3 无法发送错误通知邮件,无法满足我的需要。

标签: airflow

解决方案


您必须为每个任务定义 on_failure_callback 以在任务失败的情况下执行此代码:

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=on_failure_callback


推荐阅读