首页 > 解决方案 > 如何获取失败的 BashOperator 的退出代码?

问题描述

我可以根据这个有用的教程为失败的进程集成 Slack 通知。

def task_fail_slack_alert(context):
    """
    Sends message to a slack channel.
    If you want to send it to a "user" -> use "@user",
        if "public channel" -> use "#channel",
        if "private channel" -> use "channel"
    """
    slack_channel = BaseHook.get_connection(SLACK_CONN_ID).login
    slack_token = BaseHook.get_connection(SLACK_CONN_ID).password
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel=slack_channel,
        token=slack_token,
        text="""
            :red_circle: Task Failed. 
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url} 
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
        )
    )
    return failed_alert.execute(context=context)

task_with_failed_slack_alerts = BashOperator(
    task_id='fail_task',
    bash_command='exit 1',
    on_failure_callback=slack_failed_task,
    provide_context=True,
    dag=dag)

context从上面的代码示例中,我可以看到我们可以使用传递给函数的变量来获取有关任务的信息task_fail_slack_alert(context),例如task=context.get('task_instance').task_id

我想知道我们是否可以通过包含失败的实际原因来使这个松弛通知更具信息性。如果我们的 bash 脚本以特定的退出代码退出,这是传递给context发送给on_failure_callback函数的字典吗?

例如:

if context.get('task_instance').error_code == 2:
    message_error = 'Duplicate Key Violation'
else:
    message_error = 'Unknown error, investigate logs'

failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel=slack_channel,
        token=slack_token,
        text="""
            :red_circle: Task Failed. 
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url}
            *Error*: {message_error} 
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
            message_error=message_error,
        )
    )
return failed_alert.execute(context=context)

标签: bashairflow

解决方案


从源代码BashOperator

:param xcom_push: If xcom_push is True, the last line written to stdout 
  will also be pushed to an XCom when the bash command completes.
:type xcom_push: bool

编辑:有关它是什么XCom以及如何使用它的更多信息,请查看此处的文档


推荐阅读