首页 > 解决方案 > 当并行度设置为很大时,气流调度程序启动异常

问题描述

我是 Airflow 的新手,我正在尝试使用气流来构建数据管道,但它不断出现一些异常。我的airflow.cfg 看起来像这样:

executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
sql_alchemy_pool_size = 5
parallelism = 96
dag_concurrency = 96
worker_concurrency = 96
max_threads = 96
broker_url = postgresql+psycopg2://airflow:airflow@localhost/airflow
result_backend = postgresql+psycopg2://airflow:airflow@localhost/airflow

当我在一个终端启动airflow webserver -p 8080然后airflow scheduler在另一个终端启动时,调度程序运行将具有以下执行(当我将并行度数设置为更大一些时它失败,否则它工作正常,这可能是特定于计算机的,但至少我们知道它是由并行性导致的)。我已经尝试在我的计算机上运行 1000 个 python 进程并且它运行良好,我已将 Postgres 配置为允许最多 500 个数据库连接,但它仍然给我错误。

[2019-11-20 12:15:00,820] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 85050
Process QueuedLocalWorker-18:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 811, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/edward/.local/share/virtualenvs/avat-utils-JpGzQGRW/lib/python3.7/site-packages/airflow/executors/local_executor.py", line 111, in run
    key, command = self.task_queue.get()
  File "<string>", line 2, in get
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 815, in _callmethod
    self._connect()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 802, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

谢谢

更新:我尝试在 Pycharm 中运行,它在 Pycharm 中运行良好,但有时在终端中失败,有时不是

标签: python-3.xmultiprocessingairflowairflow-scheduler

解决方案


我遇到过同样的问题。原来我已经结合 LocalExecutor 在airflow.cfg 中设置了max_threads=10。切换 max_threads=2 解决了这个问题。


推荐阅读