首页 > 解决方案 > 如何使用 docker-compose 在分布式气流架构上配置 celery worker?

问题描述

我正在建立一个分布式 Airflow 集群,除了 celery worker 之外的所有其他内容都在一台主机上运行,​​并且在多台主机上完成处理。使用 Airflow 文档https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml中给出的 yaml 文件配置气流 2.0 设置。在我最初的测试中,当我在同一台主机上运行所有东西时,我的架构可以很好地工作。问题是,如何在远程主机上启动 celery worker?

到目前为止,我尝试创建上述 docker-compose 的精简版本,其中我只在工作主机上启动 celery 工作人员,没有别的。但是我遇到了一些与数据库连接有关的问题。在修剪后的版本中,我更改了 URL,以便它们指向运行 db 和 redis 的主机。

dag、日志、插件和 postgresql 数据库位于所有主机可见的共享驱动器上。

我应该如何进行配置?任何想法要检查什么?连接等?Celery worker docker-compose 配置:

---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment:
    &airflow-common-env
    AIRFLOW_UID: 50000
    AIRFLOW_GID: 50000
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    REDIS_PORT: 6380
   volumes:
    - /airflow/dev/dags:/opt/airflow/dags
    - /airflow/dev/logs:/opt/airflow/logs
    - /airflow/dev/plugins:/opt/airflow/plugins
   user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
  airflow-remote-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

编辑 1:我仍然对日志文件有一些困难。看来共享日志目录并不能解决丢失日志文件的问题。我像建议的那样在 main 上添加了 extra_host 定义,并在工作机器上打开了端口 8793。工作任务失败并显示日志:

*** Log file does not exist: 
/opt/airflow/logs/tutorial/print_date/2021-07- 
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''

标签: dockerdocker-composeceleryairflow

解决方案


这些设置远非“最终设置”,而是使用核心节点和工作人员中 Airflow 的 docker-compose 对我有用的一些设置:

主节点:

  • 工作节点必须可以从Webserver运行的主节点访问。我发现这个架构图CeleryExecutor整理事情很有帮助。

    在尝试读取日志时,如果在本地找不到它们,它将尝试从远程工作人员那里检索它们。因此,您的主节点可能不知道您的工作人员的主机名,因此您要么更改主机名的解析方式(hostname_callable设置,默认为socket.getfqdn),要么只需将名称解析功能添加到Webserver. 这可以通过在定义中添加extra_hosts配置键来完成:x-airflow-common

---
version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
    ...# env vars
  extra_hosts:
    - "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
    - "worker-02-hostname:worker-02-ip-address"

*请注意,在您拥有共享驱动器的特定情况下,我认为日志将在本地找到。

  • 定义并行性DAG 并发性调度程序解析过程。可以通过使用 env vars 来完成:
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
    AIRFLOW__CORE__PARALLELISM: 64
    AIRFLOW__CORE__DAG_CONCURRENCY: 32
    AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4

当然,要设置的值取决于您的具体情况和可用资源。这篇文章很好地概述了这个主题。DAG 设置也可以在DAG定义时被覆盖。

工作节点:

  • 定义 worker CELERY__WORKER_CONCURRENCY,默认可以是机器上可用的 CPU 数量(文档)。

  • 定义如何访问在主节点中运行的服务。设置 IP 或主机名并注意主节点中匹配的暴露端口:

x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
  AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
  environment: &airflow-common-env
    AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
    AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}

  env_file:
    - .env

.env 文件:FERNET_KEY=jvYUaxxxxxxxxxxxxx=

services:
  airflow-worker:
    <<: *airflow-common
    hostname: ${HOSTNAME}
    ports:
      - 8793:8793
    command: celery worker
    restart: always
  • 确保每个工作节点主机都以相同的时间配置运行,几分钟的差异可能会导致严重的执行错误,这可能并不那么容易找到。考虑在主机操作系统上启用 NTP 服务。

如果您通常有繁重的工作负载和高并发性,您可能需要调整 Postgres 设置,例如max_connectionsshared_buffers。这同样适用于主机操作系统网络设置,例如ip_local_port_rangesomaxconn

在我在初始集群设置期间遇到的任何问题中,Flowerworker 执行日志总是提供有用的详细信息和错误消息,包括任务级日志和 Docker-Compose 服务日志,即:docker-compose logs --tail=10000 airflow-worker > worker_logs.log.

希望对你有用!


推荐阅读