python-3.x - 当并行度设置为很大时,气流调度程序启动异常
问题描述
我是 Airflow 的新手,我正在尝试使用气流来构建数据管道,但它不断出现一些异常。我的airflow.cfg 看起来像这样:
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
sql_alchemy_pool_size = 5
parallelism = 96
dag_concurrency = 96
worker_concurrency = 96
max_threads = 96
broker_url = postgresql+psycopg2://airflow:airflow@localhost/airflow
result_backend = postgresql+psycopg2://airflow:airflow@localhost/airflow
当我在一个终端启动airflow webserver -p 8080
然后airflow scheduler
在另一个终端启动时,调度程序运行将具有以下执行(当我将并行度数设置为更大一些时它失败,否则它工作正常,这可能是特定于计算机的,但至少我们知道它是由并行性导致的)。我已经尝试在我的计算机上运行 1000 个 python 进程并且它运行良好,我已将 Postgres 配置为允许最多 500 个数据库连接,但它仍然给我错误。
[2019-11-20 12:15:00,820] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 85050
Process QueuedLocalWorker-18:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 811, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/edward/.local/share/virtualenvs/avat-utils-JpGzQGRW/lib/python3.7/site-packages/airflow/executors/local_executor.py", line 111, in run
key, command = self.task_queue.get()
File "<string>", line 2, in get
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 815, in _callmethod
self._connect()
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 802, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused
谢谢
更新:我尝试在 Pycharm 中运行,它在 Pycharm 中运行良好,但有时在终端中失败,有时不是
解决方案
我遇到过同样的问题。原来我已经结合 LocalExecutor 在airflow.cfg 中设置了max_threads=10。切换 max_threads=2 解决了这个问题。
推荐阅读
- bash - 将多个文件与第一列中的文件名合并
- c++ - 如果其他错误:如何使它打印任何数字?
- javascript - Jquery 计算器基础价格 + 25%
- python - 如何逐个单元格比较 2 个数据帧并返回单元格级别的更改
- eclipse - 如何将 Eclipse 工作区导入 IntelliJ
- azure-sql-data-warehouse - Azure SQL 数据仓库 - 分页
- java - 如何使用 Kerberos 在 hive UDF 中获得 HBase 连接?
- asp.net-mvc - 如何在 ASP.Net MVC 中使用 LINQ Lambda 连接三个表并将其放入 ViewModel?
- python-2.7 - sklearn.SVC 中的独立 SVC 行是做什么的?
- azure-functions - Azure 函数计时器 SQL 客户端失败