首页 > 解决方案 > 如何重试完成 Airflow DAG?

问题描述

我知道可以重试单个任务,但是可以重试完整的 DAG 吗?

我动态创建任务,这就是为什么我需要重试的不是特定任务,而是完成 DAG。如果 Airflow 不支持它,也许有一些解决方法。

标签: airflow

解决方案


我编写了以下脚本并将其安排在气流主机上,为“dag_ids_to_monitor”数组中提到的 DAG 重新运行失败的 DAG 运行

    import subprocess
    import re
    from datetime import datetime
    
    dag_ids_to_monitor = ['dag1','dag2','dag2']
    
    
    
    def runBash(cmd):
        print ("running bash command {}".format(cmd))
        output = subprocess.check_output(cmd.split())
        return output
    
    
    def datetime_valid(dt_str):
        try:
            datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
            print(dt_str)
            print(datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S'))
        except:
            return False
        return True
    
    
    def get_schedules_to_rerun(dag_id):
        bashCommand = "airflow list_dag_runs --state failed {dag_id}".format(dag_id=dag_id)
        output = runBash(bashCommand)
    
        schedules_to_rerun = []
        for line in output.split('\n'):
            parts = re.split("\s*\|\s*", line)
            if len(parts) > 4 and datetime_valid(parts[3][:-6]):
                schedules_to_rerun.append(parts[3])
        return schedules_to_rerun
    
    
    def trigger_runs(dag_id, re_run_start_times):
        for start_time in re_run_start_times:
            runBash("airflow clear --no_confirm --start_date {sd} --end_date {sd} {dag_id}".format(sd=start_time, dag_id=dag_id))
    
    
    def rerun_failed_dag_runs(dag_id):
        re_run_start_times = get_schedules_to_rerun(dag_id)
        trigger_runs(dag_id,re_run_start_times)
    
    
    for dag_id in dag_ids_to_monitor:
        rerun_failed_dag_runs(dag_id)
    
    

推荐阅读