首页 > 解决方案 > 如何在气流 DAG 中将数字设置为重试条件?

问题描述

在我的Airflow DAG我有 4tasks

task_1 >> [task_2,task_3]>> task_4

task_4仅在成功运行两者后task_2运行task_3

我如何设置条件,例如:

如果task_2失败,请task_2在 2 分钟后重试,并在第 5 次尝试后停止重试

这是我的代码:

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

args={
    'owner' : 'Anti',
    'start_date':days_ago(1)# 1 means yesterday
}

dag = DAG(dag_id='my_sample_dag',default_args=args,schedule_interval='15 * * * *')

def func1(**context):
    print("ran task 1")

def func2(**context):
    print("ran task 2")

def func3(**context):
    print("ran task 3")

def func4(**context):
    print("ran task 4")

with dag:
    task_1=PythonOperator(
        task_id='task1',
        python_callable=func1,
        provide_context=True,
        
    )
    task_2=PythonOperator(
        task_id='task2',
        python_callable=func2,
        provide_context=True 
    )
    task_3=PythonOperator(
        task_id='task3',
        python_callable=func3,
        provide_context=True 
    )
    task_4=PythonOperator(
        task_id='task4',
        python_callable=func4,
        provide_context=True 
    )

task_1 >> [task_2,task_3]>> task_4 # t2,t3 runs parallel right after t1 has ran

标签: airflow-schedulerairflow

解决方案


每个运营商都支持retry_delayretries- Airflow 文档

retries (int) -- 在任务失败之前应该执行的重试次数

retry_delay (datetime.timedelta) – 重试之间的延迟

如果您想将此应用于所有任务,您只需编辑您的 args 字典:

args={
    'owner' : 'Anti',
    'retries': 5,
    'retry_delay': timedelta(minutes=2),
    'start_date':days_ago(1)# 1 means yesterday
}

如果您只想将其应用到 task_2,您可以直接将其传递给PythonOperator- 在这种情况下,其他任务使用默认设置。

对您的参数有一条评论,不建议设置动态相对start_date日期,而是设置固定的绝对日期。


推荐阅读