python - 使用数据库信息构建动态 DAG
问题描述
我是 Airflow 的新手,我正在尝试找出使用从数据库中检索到的信息动态创建一组 DAG 的最佳方法。目前我已经想到了这个可能的解决方案:
# file: dags_builder_dag.py in DAG_FOLDER
# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
dag_id = 'hello_world_child_{}'.format(str(dag['id']))
default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
# Add dag to global scope to let airflow digest it.
globals()[dag_id] = create_dag(dag_id, default_args_child)
但是,如果我没记错的话,所有 dag 文件,包括在本示例中生成所有 dag 的文件(dags_builder_dag.py),都会被 Airflow 定期解析,这意味着 api_getDBInfo() 将在每次解析时执行. 如果我是对的,那将是避免连续执行 api_getDBInfo() 的最佳实践,这对数据库来说可能是一项耗时的操作?理想情况下,仅在需要时才应检索此信息,例如手动触发。
我想到的其他可能的解决方法:
- 使用Airfow 变量作为标志来评估是否需要再次解析 dags_builder_dag.py 此变量可以通过以下方式使用:
# file: dags_builder_dag.py in DAG_FOLDER
buildDAGs = Variables.get('buildDAGs')
if buildDAGs == 'true':
# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
dag_id = 'hello_world_child_{}'.format(str(dag['id']))
default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
# Add dag to global scope to let airflow digest it.
globals()[dag_id] = create_dag(dag_id, default_args_child)
- 将airflow.cfg文件的min_file_process_interval参数设置为较高的值,以避免不断解析。然而,这也有增加 dags 运行时延迟的缺点。
更新
感谢@NicoE 和@floating_hammer,我找到了适合我用例的解决方案。
第一次尝试:气流变量作为缓存
我可以使用气流变量作为存储在数据库中的数据的缓存,以避免连续调用“api_getDBInfo”。但是,通过这种方式,我遇到了另一个瓶颈:可变大小。气流变量是键值对。键的长度为 256。存储在元数据中的值将受到元数据数据库支持的字符串大小的限制。https://github.com/apache/airflow/blob/master/airflow/models/variable.py https://github.com/apache/airflow/blob/master/airflow/models/base.py
在我的情况下,我使用的是Amazon MWAA,与 aws 使用的底层元数据库相关的详细信息及其结构可能很难找到(实际上我并没有尝试进行很多调查)。所以我只是执行了一个压力测试,强制变量内部的大量数据来看看会发生什么。下面是结果:
数据量 | 结果 |
---|---|
~0,5 MB(当前) | 写入和读取操作没有问题。 |
~50 MB (x100) | 写入和读取操作没有问题。 |
~125 MB (x250) | 写入和读取操作没有问题,但使用气流的 Web 控制台,无法访问变量部分。服务器返回错误 502“Bad gateway” |
~250 MB (x500) | 写入变量失败。 |
第二次尝试:S3 文件作为缓存
正如之前的测试所示,气流变量有一个限制,所以我尝试保持相同的模式,使用 S3 文件更改气流变量,考虑到 S3 没有限制,这适用于我的特定用例在空间中作为气流变量。
简单总结一下:
- 我创建了一个名为:“sync_db_cache_dag”的 dag,它每小时使用 api_getDBInfo() 检索的数据更新 S3“db_cache.json”。数据以 JSON 格式存储。
- 脚本“dags_builder_dag.py”现在从“db_cache.json”中检索数据,这样数据库就不用连续调用“api_getDBInfo”了。
解决方案
您可以尝试以下步骤。
- 创建一个变量,该变量将保存任务的配置和要创建的任务数。
创建一个以设定频率触发的 DAG。dag 有两个任务。
- 任务 1 读取数据库并填充变量。
- 任务 2 读取变量并创建多个任务。
推荐阅读
- c# - 如何让台球杆围绕母球旋转?
- python - 是否可以使用 Earth Engine API for python 获取图像集中每个图像的下载 URL?
- actions-on-google - 谷歌助理没有说出欢迎文字
- angular - Form - 将所有输入格式化为大写的指令
- reactjs - reactjs中如何同时绑定两个方法
- firebase - 对文档的部分写入权限
- python - rpy2 importr 错误:无法导入任何包
- google-bigquery - 具有比特币输入和输出的重复列名
- installation - Inno Setup 安装程序多个页面上的组件
- sql - 如何在查询中汇总特定字符串