首页 > 解决方案 > 显式跳过 DAG 时的 Slack 通知

问题描述

在 Airflow 中,我知道您可以使用 on_success_callback 和 on_failure_callback 自动发送松弛通知,在我的情况下它们工作正常。

在我的用例中,我有一个 ETL,如果当天数据为空且工作正常,它将引发 AirflowSkipException。但这会向我的 slack 发送成功通知

我想知道是否有类似 on_skip_callback 之类的东西,或者有一种方法可以将我的 DAG 在当天被跳过的通知发送到我的 slack。

任何帮助将不胜感激。谢谢

编辑:为我的 ETL 添加了代码参考。数据点是从数据库中获取的,它可能每天都在变化,有时如果没有要处理的数据,那么数据点将为空,反之亦然。


def ETL_function():

    # Retrieve data code 

    .... 


    # Validation to check if ETL data is empty
    if not datapoints:
        print("OUTPUT LOG : ETL Data not found/empty")
        print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
        raise AirflowSkipException
        # return False
    else : 
        print("OUTPUT LOG : ETL Data found")
        print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
        # return True 


   # ETL Process code

    ....


ETL_function_Task = PythonOperator(
    task_id='ETL_function',
    provide_context=True,
    python_callable=fleet_behavior_idling,
    on_success_callback=task_success_slack_alert,
    dag=dag,
)

标签: airflowslackslack-api

解决方案


嗨@Anindhito Irmandharu,

ShortCircuitOperator为此,您可以使用从 PythonOperator 派生的。

def ETL_function():
    ...
    # Validation to check if ETL data is empty
    if not datapoints:
        print("OUTPUT LOG : ETL Data not found/empty")
        print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
        return False
    else : 
        print("OUTPUT LOG : ETL Data found")
        print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
        return True 

ETL_function_Task = ShortCircuitOperator(
    task_id="ETL_function",
    python_callable= ETL_function,
    provide_context=True,
    dag=dag,
)

ETL_function_Task >> downstream_Tasks

注意:您的下游任务将被跳过,但此任务“ETL_function_Task”将进入成功状态。我不确定为什么您需要为成功执行的任务发送松弛通知。虽然你可以很容易地改变

on_success_callback=task_success_slack_alert

根据您的要求。task_skipped_slack_alert在你正在使用的 slack_hook 中写一个新的。


推荐阅读