airflow - 从单个 python 源代码生成数百个 DAG 时气流变慢
问题描述
在我们的大数据项目中,有大约 3000 个表要加载,所有这些表都应该由 Airflow 中的单独 DAG 处理。
在我们的解决方案中,单个 python 文件生成每种类型的表加载器,因此它们可以通过 REST API 以基于事件的方式通过 Cloud Function 单独触发。因此,我们使用以下方法生成 DAG:
- 用于 DAG 生成器逻辑的气流变量
- 要生成的表名列表
- 表类型:插入追加、截断加载、scd1、scd2
- 表加载器 DAG 的特定运算符使用的气流变量,例如:
- RR_TableN = {} // 用于操作员处理 RawToRaw 的 python dict
- RC_TableN = {} // 用于操作员处理 RawToCuration 的 python dict
- 用户定义的宏:
- 我们尽量不要在任务定义之间使用“静态 Python 代码”,因为它们会在 DAG 生成过程中执行
- 用户定义的宏仅在 DAG 执行时间内进行评估
不幸的是,我们绑定到 Airflow v1.xx 版本
问题:
我们注意到,当生成多个 DAG 时,Airflow/Cloud Composer 在任务执行之间的速度明显变慢。当仅生成 10-20 个 DAG 时,Task 执行之间的时间要快得多,然后我们就有 100-200 个 DAG。当生成 1000 个 DAG 时,即使没有执行其他 DAG,在完成给定 DAG 的先前任务后也需要几分钟才能启动新任务。
我们不明白为什么任务执行时间会受到生成的 DAG 数量的影响那么大。气流在其元数据库中搜索任务实例所需参数的时间不应该接近恒定吗?我们不确定 Google 是否正确配置/扩展/管理 Cloud Composer。
问题:
- 从 Airflow 方面来看,这种放缓背后的原因是什么?
- 我们如何才能减少任务执行之间的等待时间并加快整个过程?
- 这是我们正在实现的“糟糕的设计模式”(生成器和用户定义的宏处理气流变量)吗?
- 如果是这样,我们如何以更有效的方式做类似的事情(表分隔的 DAG、单个代码库等)?
这是我们使用的生成器代码的一个非常简单的示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def create_dag(dag_id, schedule, dag_number, default_args):
def example(*args):
print('Example DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id='example', python_callable=example)
return dag
for dag_number in range(1, 5000):
dag_id = 'Example_{}'.format(str(dag_number))
default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
globals()[dag_id] = create_dag(dag_id, '@daily', dag_number, default_args)
解决方案
是的。这是一个已知问题。它已在 Airflow 2 中修复。
这是在 Airflow 1 中处理 DAG 文件的方式所固有的(主要是关于生成的查询数量)。
除了迁移到 Airflow 2,您无能为力。解决这个问题需要对 Airflow 调度程序逻辑进行完全重构和半重写。
减轻它的一种方法 - 您可以潜在地,而不是从单个文件生成所有 DAG,将其拆分为许多文件。例如,您可以生成 3000 个单独的、动态生成的小 DAG 文件,而不是在单个 Python 文件中生成 DAG 对象。这将更好地扩展。
然而,好消息是,在 Airflow 2 中,这比 Airflow 2 快很多倍且可扩展。并且 Airlfow 1.10 达到 EOL 并且不再支持并且不会收到任何更新。因此,与其改变流程,我强烈建议您迁移。
推荐阅读
- c# - 从 C# 应用程序中删除远程服务器上的 MySql 数据库表
- ngrx - NgRx。使用状态信息调度动作
- javascript - React Materialize Parallax 图像不随背景滚动
- laravel - 为什么在运行“php artisan migrate --env=local”后创建“迁移”和“用户”表
- vba - 仅在第一次时在 MouseDown/MouseUp 上清空预填充的 TextBox
- javascript - Xampp 接受 Ajax 请求
- c# - c#异步运行多个process.Start()
- javascript - all: function() 是做什么的?
- r - 如何在 R 中将矩阵作为 data.frame 的元素放置?
- azure - Azure DevOps 将路径过滤器定义为 yml 构建文件