首页 > 解决方案 > 从单个 python 源代码生成数百个 DAG 时气流变慢

问题描述

在我们的大数据项目中,有大约 3000 个表要加载,所有这些表都应该由 Airflow 中的单独 DAG 处理。

在我们的解决方案中,单个 python 文件生成每种类型的表加载器,因此它们可以通过 REST API 以基于事件的方式通过 Cloud Function 单独触发。因此,我们使用以下方法生成 DAG:

不幸的是,我们绑定到 Airflow v1.xx 版本

问题:

我们注意到,当生成多个 DAG 时,Airflow/Cloud Composer 在任务执行之间的速度明显变慢。当仅生成 10-20 个 DAG 时,Task 执行之间的时间要快得多,然后我们就有 100-200 个 DAG。当生成 1000 个 DAG 时,即使没有执行其他 DAG,在完成给定 DAG 的先前任务后也需要几分钟才能启动新任务。

我们不明白为什么任务执行时间会受到生成的 DAG 数量的影响那么大。气流在其元数据库中搜索任务实例所需参数的时间不应该接近恒定吗?我们不确定 Google 是否正确配置/扩展/管理 Cloud Composer。

问题:


这是我们使用的生成器代码的一个非常简单的示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def create_dag(dag_id, schedule, dag_number, default_args):

    def example(*args):
        print('Example DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id='example', python_callable=example)

    return dag


for dag_number  in range(1, 5000):
    dag_id = 'Example_{}'.format(str(dag_number))
    default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    globals()[dag_id] = create_dag(dag_id, '@daily', dag_number, default_args)

标签: airflowairflow-schedulergoogle-cloud-composer

解决方案


是的。这是一个已知问题。它已在 Airflow 2 中修复。

这是在 Airflow 1 中处理 DAG 文件的方式所固有的(主要是关于生成的查询数量)。

除了迁移到 Airflow 2,您无能为力。解决这个问题需要对 Airflow 调度程序逻辑进行完全重构和半重写。

减轻它的一种方法 - 您可以潜在地,而不是从单个文件生成所有 DAG,将其拆分为许多文件。例如,您可以生成 3000 个单独的、动态生成的小 DAG 文件,而不是在单个 Python 文件中生成 DAG 对象。这将更好地扩展。

然而,好消息是,在 Airflow 2 中,这比 Airflow 2 快很多倍且可扩展。并且 Airlfow 1.10 达到 EOL 并且不再支持并且不会收到任何更新。因此,与其改变流程,我强烈建议您迁移。


推荐阅读