首页 > 解决方案 > 动态创建的 dag 会自动从 Airflow UI 中删除

问题描述

我对Apache Airflow很陌生,所以我不确定气流的所有功能。

我的要求是为新添加到数据库的条目创建动态 dag。检查是否有新添加到 DB 的标准是是否dag_id针对该条目进行了更新。如果没有,那么我们需要为它创建一个新的 dag 并dag_id针对该条目更新。

Dag_id 为 Null 的数据库中的新条目

Id  :d001   ,
Dag_id :Null

一旦 main dag(用于创建动态 dag)运行并获取此条目,就会创建一个 dag 并将dag_id其更新回数据库。

Id : d001 ,
Dag_id : dagid-001

我面临的挑战是一旦主要 dags 再次运行以获取新添加的条目(这次我们不会检索 d001)。之前创建的动态 dags(dagid-001) 将从Airflow UI 和Airflow list_dags 中删除。

来自主 dag 的代码以创建动态 dag

dag_id = 'hello_world_{}'.format(str(new_entry[0]))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = ## frequency 

    dag_number = new_entry

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

这是动态 dag 的预期行为还是我在创建它们时遗漏了什么?

标签: airflow

解决方案


DAG对于 Airflow,只有在扫描目录中的 Python 文件时找到相应的对象时,才会存在 DAG dag

因此,您的主 DAG 不仅要为新的数据库条目生成 DAG,还要为所有以前的条目生成 DAG。


推荐阅读