airflow - 如何重试完成 Airflow DAG?
问题描述
我知道可以重试单个任务,但是可以重试完整的 DAG 吗?
我动态创建任务,这就是为什么我需要重试的不是特定任务,而是完成 DAG。如果 Airflow 不支持它,也许有一些解决方法。
解决方案
我编写了以下脚本并将其安排在气流主机上,为“dag_ids_to_monitor”数组中提到的 DAG 重新运行失败的 DAG 运行
import subprocess
import re
from datetime import datetime
dag_ids_to_monitor = ['dag1','dag2','dag2']
def runBash(cmd):
print ("running bash command {}".format(cmd))
output = subprocess.check_output(cmd.split())
return output
def datetime_valid(dt_str):
try:
datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
print(dt_str)
print(datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S'))
except:
return False
return True
def get_schedules_to_rerun(dag_id):
bashCommand = "airflow list_dag_runs --state failed {dag_id}".format(dag_id=dag_id)
output = runBash(bashCommand)
schedules_to_rerun = []
for line in output.split('\n'):
parts = re.split("\s*\|\s*", line)
if len(parts) > 4 and datetime_valid(parts[3][:-6]):
schedules_to_rerun.append(parts[3])
return schedules_to_rerun
def trigger_runs(dag_id, re_run_start_times):
for start_time in re_run_start_times:
runBash("airflow clear --no_confirm --start_date {sd} --end_date {sd} {dag_id}".format(sd=start_time, dag_id=dag_id))
def rerun_failed_dag_runs(dag_id):
re_run_start_times = get_schedules_to_rerun(dag_id)
trigger_runs(dag_id,re_run_start_times)
for dag_id in dag_ids_to_monitor:
rerun_failed_dag_runs(dag_id)
推荐阅读
- c# - 数据绑定时防止数据网格中的重复
- reactjs - 我在我的 jsfiddle 中收到错误未关闭的正则表达式
- android - 在片段中动态生成的 RadioGroup 中获取选中按钮
- redis - 无法加载密钥:redis 服务器不支持扫描命令
- reactjs - 带有 react/redux/router 流的搜索组件
- git - Jenkins 在构建之前无法清理工作区
- spring - 使用 Get 方法 @QueryParam 将枚举列表传递给 Spring REST
- reactjs - 无法将所需的参数传递给反应渲染()的jsx内的函数
- javascript - 如何在 React 中处理 CORS
- c - 如何将二进制转换为十六进制并将值写入某个地址?