首页 > 解决方案 > 如何缩短在 GCP Composer 上检测新 Dag 的时间?(空气流动)

问题描述

我在 GCP (Airflow) 上创建了一个 Composer 集群,它工作得很好,但对于即时任务来说速度很慢。我在本地文件中创建了下一个 DAG:

import datetime
from airflow import models
from pprint import pprint as pp
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
import datetime as dt

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.bash_operator import BashOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


def create_task_dag(dag_id, schedule, start_date, task_id):

    dag = models.DAG(
        dag_id=dag_id,
        schedule_interval=schedule,
        start_date=start_date
    )

    with dag:
        _operator_name = KubernetesPodOperator(
            task_id='#task_type#-#taskid#',
            name='#task_type#-#taskid#',
            cmds=['bash', '-cx'],
            arguments=["command"],
            env_vars=#env_vars#,
            namespace='default',
            image='image:latest',
            image_pull_secrets="secret",
            image_pull_policy='Always',
            get_logs=True
        )
        _operator_name
        return dag

task_id='#task_type#-#taskid#'
dag_id = 'dag-#task_type#-#taskid#'
schedule = '@once'
start_date = datetime.datetime.now()

globals()[dag_id] = create_task_dag(
    dag_id=dag_id,
    schedule=schedule,
    start_date=start_date,
    task_id=task_id)

当我上传该文件时,它需要很长时间才能运行,所以,我补充说dag_dir_list_interval=1,它有所改进,但需要 20-30 秒才能出现在 Airflow 的仪表板上,而在 Kubernetes 上启动需要 40-60 秒。

这意味着我需要将近一分钟的时间来运行一个 Hello 世界。

我可能有错误吗?

PS:我检查了调度程序,它在 2-5 秒后出现在 pod 磁盘中。

映像版本:composer-1.8.4-airflow-1.10.3 集群:Kubernetes 上的 3 个节点n1-standard-1

标签: airflowgoogle-cloud-composer

解决方案


这通常按预期工作。如果您想在本地进行测试,您可能需要在本地安装 Airflow 并在那里测试 DAG 组件。这按预期工作的原因有几个。

  1. Composer 从 Cloud Storage 获取 DAG 需要一点时间。如文档中所述,同步是最终的。
  2. Airflow 在任务延迟方面并不是“最好的”——操作需要几秒钟才能完成的情况并不少见。

我怀疑如果您尝试使用类似的东西来打个招呼世界BashOperator会快得多(只有 GCS 延迟。)


推荐阅读