首页 > 解决方案 > 使用数据库信息构建动态 DAG

问题描述

我是 Airflow 的新手,我正在尝试找出使用从数据库中检索到的信息动态创建一组 DAG 的最佳方法。目前我已经想到了这个可能的解决方案:

# file: dags_builder_dag.py in DAG_FOLDER

# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
    dag_id = 'hello_world_child_{}'.format(str(dag['id']))
    default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    # Add dag to global scope to let airflow digest it.
    globals()[dag_id] = create_dag(dag_id, default_args_child)

但是,如果我没记错的话,所有 dag 文件,包括在本示例中生成所有 dag 的文件(dags_builder_dag.py),都会被 Airflow 定期解析,这意味着 api_getDBInfo() 将在每次解析时执行. 如果我是对的,那将是避免连续执行 api_getDBInfo() 的最佳实践,这对数据库来说可能是一项耗时的操作?理想情况下,仅在需要时才应检索此信息,例如手动触发。

我想到的其他可能的解决方法:

# file: dags_builder_dag.py in DAG_FOLDER

buildDAGs = Variables.get('buildDAGs')
if buildDAGs == 'true':
  # Get info to build required dags from DB
  dag_info = api_getDBInfo()
  # Dynamically create dags based on info retrieved
  for dag in dag_info:
      dag_id = 'hello_world_child_{}'.format(str(dag['id']))
      default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
      # Add dag to global scope to let airflow digest it.
      globals()[dag_id] = create_dag(dag_id, default_args_child)

更新

感谢@NicoE 和@floating_hammer,我找到了适合我用例的解决方案。

第一次尝试:气流变量作为缓存

我可以使用气流变量作为存储在数据库中的数据的缓存,以避免连续调用“api_getDBInfo”。但是,通过这种方式,我遇到了另一个瓶颈:可变大小。气流变量是键值对。键的长度为 256。存储在元数据中的值将受到元数据数据库支持的字符串大小的限制。https://github.com/apache/airflow/blob/master/airflow/models/variable.py https://github.com/apache/airflow/blob/master/airflow/models/base.py

在我的情况下,我使用的是Amazon MWAA,与 aws 使用的底层元数据库相关的详细信息及其结构可能很难找到(实际上我并没有尝试进行很多调查)。所以我只是执行了一个压力测试,强制变量内部的大量数据来看看会发生什么。下面是结果:

数据量 结果
~0,5 MB(当前) 写入和读取操作没有问题。
~50 MB (x100) 写入和读取操作没有问题。
~125 MB (x250) 写入和读取操作没有问题,但使用气流的 Web 控制台,无法访问变量部分。服务器返回错误 502“Bad gateway”
~250 MB (x500) 写入变量失败。

第二次尝试:S3 文件作为缓存

正如之前的测试所示,气流变量有一个限制,所以我尝试保持相同的模式,使用 S3 文件更改气流变量,考虑到 S3 没有限制,这适用于我的特定用例在空间中作为气流变量。

简单总结一下:

  1. 我创建了一个名为:“sync_db_cache_dag”的 dag,它每小时使用 api_getDBInfo() 检索的数据更新 S3“db_cache.json”。数据以 JSON 格式存储。
  2. 脚本“dags_builder_dag.py”现在从“db_cache.json”中检索数据,这样数据库就不用连续调用“api_getDBInfo”了。

标签: pythonairflowairflow-scheduler

解决方案


您可以尝试以下步骤。

  • 创建一个变量,该变量将保存任务的配置和要创建的任务数。

创建一个以设定频率触发的 DAG。dag 有两个任务。

  • 任务 1 读取数据库并填充变量。
  • 任务 2 读取变量并创建多个任务。

推荐阅读