首页 > 解决方案 > 仅在某些异常上重试 Airflow 任务实例

问题描述

仅针对某些故障/异常重试 Airflow 操作员的最佳方法是什么?

例如,假设我有一个 Airflow 任务,它依赖于外部服务的可用性。如果此服务在任务执行期间不可用,我想稍后重试(最多重试 3 次)。对于其他失败,我不想重试。

我目前的方法是通过 parsing 使用on_failure_callback和操作context["ti"].task.retries所需的异常context["exception"],但我认为这很混乱且难以理解。有更好的选择吗?

标签: pythonairflowgoogle-cloud-composer

解决方案


Most of airflow's operators use a Hook class to complete the work.

If you can create your own PythonOperator and try/catch the exceptions you want to avoid and throw the exceptions you want to trigger the retry it will comply with airflow architecture seamlessly:

# python operator function
def my_operation():
    try:
        hook = SomeHook()
        hook.use_it()
    catch IgnorableException e:
        pass


# then:
my_operator = PythonOperator(
    task_id='my-operator',
    python_callable=my_operation
)

It gives you more control over your Operator and DAG life-cycle.


推荐阅读