airflow-scheduler - 如何在气流 DAG 中将数字设置为重试条件?
问题描述
在我的Airflow DAG
我有 4tasks
task_1 >> [task_2,task_3]>> task_4
task_4
仅在成功运行两者后task_2
运行task_3
我如何设置条件,例如:
如果task_2
失败,请task_2
在 2 分钟后重试,并在第 5 次尝试后停止重试
这是我的代码:
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
args={
'owner' : 'Anti',
'start_date':days_ago(1)# 1 means yesterday
}
dag = DAG(dag_id='my_sample_dag',default_args=args,schedule_interval='15 * * * *')
def func1(**context):
print("ran task 1")
def func2(**context):
print("ran task 2")
def func3(**context):
print("ran task 3")
def func4(**context):
print("ran task 4")
with dag:
task_1=PythonOperator(
task_id='task1',
python_callable=func1,
provide_context=True,
)
task_2=PythonOperator(
task_id='task2',
python_callable=func2,
provide_context=True
)
task_3=PythonOperator(
task_id='task3',
python_callable=func3,
provide_context=True
)
task_4=PythonOperator(
task_id='task4',
python_callable=func4,
provide_context=True
)
task_1 >> [task_2,task_3]>> task_4 # t2,t3 runs parallel right after t1 has ran
解决方案
每个运营商都支持retry_delay
和retries
- Airflow 文档。
retries (int) -- 在任务失败之前应该执行的重试次数
retry_delay (datetime.timedelta) – 重试之间的延迟
如果您想将此应用于所有任务,您只需编辑您的 args 字典:
args={
'owner' : 'Anti',
'retries': 5,
'retry_delay': timedelta(minutes=2),
'start_date':days_ago(1)# 1 means yesterday
}
如果您只想将其应用到 task_2,您可以直接将其传递给PythonOperator
- 在这种情况下,其他任务使用默认设置。
对您的参数有一条评论,不建议设置动态相对start_date
日期,而是设置固定的绝对日期。
推荐阅读
- mysql - 无法在 MYSQL 中解析 varchar YYYY-mm-ddTHH:MM:ss
- sql-server - SQL Server Service Broker - 改进 SQL 执行框架的方法
- javascript - Chrome 本地 IndexedDB (PouchDB) 复制到 CouchDB 问题
- c# - unity camera 通过鼠标输入旋转,如何旋转移动到默认位置?
- python - python中的二维列表
- r - 是否可以仅更改 1 个方面标题的对齐方式
- sql - Oracle中的数字格式问题
- javascript - 如何从 Google API Json 响应中获取特定值?
- fluent-bit - 如何在尾部插件中获取偏移字段或行号?
- mysql - 当 kivy 应用程序被杀死时关闭数据库连接