首页 > 解决方案 > 为什么 Airflow PythonOperator 任务失败但返回码为 0?

问题描述

我有一个与 PythonOperator 一起运行的 Airflow DAG,我想知道为什么我的任务执行失败但返回代码 0 退出?

执行失败并返回代码为零误导我将任务视为成功执行。

您可以在下面看到工作日志或附件图片,谁能解释为什么会发生这种情况并建议如何避免这种情况?

任务实例日志:

[2019-11-15 22:45:23,633] {base_task_runner.py:115} 信息 - 作业 736:子任务 http_request_send_push 2019-11-15 22:45:23,632 - 10688 - 错误 - 74 - http_request_send_push:http_request_send_push 服务触发重新发送-推送错误::

[2019-11-15 22:45:23,633] {logging_mixin.py:112} 信息-

[2019-11-15 22:45:23,632] {notification.py:74} 错误 - http_request_send_push:http_request_send_push 服务触发器-重新发送-推送错误::

[2019-11-15 22:45:23,633] {python_operator.py:114} 信息 - 完成。返回值为:无

[2019-11-15 22:45:25,251] {logging_mixin.py:112} 信息-

[2019-11-15 22:45:25,250] {local_task_job.py:103} INFO - 任务退出并返回代码 0

任务实例日志截图:

在此处输入图像描述

DAG 树视图截图:

在此处输入图像描述

标签: pythonairflow-schedulerairflow

解决方案


简单来说,PythonOperator就是一个执行python函数的操作符。如果有任何错误并且您希望任务failed说明,那么您需要在 python 可调用函数中引发异常。在下面的示例代码中,请参阅fourth_task.

另一种方法是使用ShortCircuitOperator. 以下是Apache Airflow API 参考指南中的描述:

它评估条件并在条件为 False 时使工作流程短路。任何下游任务都标记为“已跳过”状态。如果条件为 True,则下游任务正常进行。

PythonOperator请参阅下面的示例代码,它解释了和之间的区别ShortCircuitOperator。还展示了如何引发异常并将任务更改为failed状态。

def first_task(**kwargs):
    logging.info("first_task")


def second_task(**kwargs):
    logging.info("second_task")
    return True


def third_task(**kwargs):
    logging.info("third_task")
    return False


def fourth_task(**kwargs):
    logging.info("fourth_task")
    raise Exception()


def fifth_task(**kwargs):
    logging.info("fifth_task")
    return True


def sixth_task(**kwargs):
    logging.info("sixth_task")
    return False

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=first_task,
    dag=dag)
first_task_successor = DummyOperator(task_id='first_task_successor', dag=dag)
first_task_successor.set_upstream(first_task)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=second_task,
    dag=dag)
second_task_successor = DummyOperator(task_id='second_task_successor', dag=dag)
second_task_successor.set_upstream(second_task)


third_task = PythonOperator(
    task_id='third_task',
    provide_context=True,
    python_callable=third_task,
    dag=dag)
third_task_successor = DummyOperator(task_id='third_task_successor', dag=dag)
third_task_successor.set_upstream(third_task)


fourth_task = PythonOperator(
    task_id='fourth_task',
    provide_context=True,
    python_callable=fourth_task,
    dag=dag)
fourth_task_successor = DummyOperator(task_id='fourth_task_successor', dag=dag)
fourth_task_successor.set_upstream(fourth_task)


fifth_task = ShortCircuitOperator(
    task_id='fifth_task',
    provide_context=True,
    python_callable=fifth_task,
    dag=dag)
fifth_task_successor = DummyOperator(task_id='fifth_task_successor', dag=dag)
fifth_task_successor.set_upstream(fifth_task)

sixth_task = ShortCircuitOperator(
    task_id='sixth_task',
    provide_context=True,
    python_callable=sixth_task,
    dag=dag)
sixth_task_successor = DummyOperator(task_id='sixth_task_successor', dag=dag)
sixth_task_successor.set_upstream(sixth_task)

截屏: 在此处输入图像描述


推荐阅读