python - Celery ConnectionResetError:[Errno 104] 对等方重置连接
问题描述
我们正在创建一个应用程序,它由一个前端(flask api)和一个使用 celery 的后端组成。API 启动 celery 任务并检索结果如下:
result = data_source_tasks.add_data_point.delay(tok, uuid, source_type, datum, request_counter)
return result.get(timeout=5)
我们使用 RabbitMQ 作为代理和结果后端:
celery_broker_url = pyamqp://guest@localhost//
celery_result_backend = rpc://
在一切正常运行一段时间后(数千个 api 调用),我收到以下错误:
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 1982, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 1614, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 1517, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.4/dist-packages/flask/_compat.py", line 33, in reraise
raise value
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 1612, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.4/dist-packages/flask/app.py", line 1598, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.4/dist-packages/connexion/decorators/decorator.py", line 66, in wrapper
response = function(request)
File "/usr/local/lib/python3.4/dist-packages/connexion/decorators/validation.py", line 122, in wrapper
response = function(request)
File "/usr/local/lib/python3.4/dist-packages/connexion/decorators/validation.py", line 293, in wrapper
return function(request)
File "/usr/local/lib/python3.4/dist-packages/connexion/decorators/decorator.py", line 42, in wrapper
response = function(request)
File "/usr/local/lib/python3.4/dist-packages/connexion/decorators/parameter.py", line 219, in wrapper
return function(**kwargs)
File "/mynedata/lib/api/apicalls.py", line 747, in store_datum
return result.get(timeout=5)
File "/usr/local/lib/python3.4/dist-packages/celery/result.py", line 224, in get
on_message=on_message,
File "/usr/local/lib/python3.4/dist-packages/celery/backends/async.py", line 188, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "/usr/local/lib/python3.4/dist-packages/celery/backends/async.py", line 255, in _wait_for_pending
on_interval=on_interval):
File "/usr/local/lib/python3.4/dist-packages/celery/backends/async.py", line 56, in drain_events_until
yield self.wait_for(p, wait, timeout=1)
File "/usr/local/lib/python3.4/dist-packages/celery/backends/async.py", line 65, in wait_for
wait(timeout=timeout)
File "/usr/local/lib/python3.4/dist-packages/celery/backends/rpc.py", line 63, in drain_events
return self._connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.4/dist-packages/kombu/connection.py", line 301, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/usr/local/lib/python3.4/dist-packages/kombu/transport/pyamqp.py", line 103, in drain_events
return connection.drain_events(**kwargs)
File "/usr/local/lib/python3.4/dist-packages/amqp/connection.py", line 471, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.4/dist-packages/amqp/connection.py", line 476, in blocking_read
frame = self.transport.read_frame()
File "/usr/local/lib/python3.4/dist-packages/amqp/transport.py", line 226, in read_frame
frame_header = read(7, True)
File "/usr/local/lib/python3.4/dist-packages/amqp/transport.py", line 401, in _read
s = recv(n - len(rbuf))
ConnectionResetError: [Errno 104] Connection reset by peer
我可以在我启动 celery worker 的控制台中看到任务(以及所有后续任务)成功,但是,result.get 导致此任务和所有后续任务超时。我与结果后端的连接是否以某种方式中断?如果我重新启动 API,既不重新启动 celery worker 也不重新启动 rabbitmq,一切都会再次正常运行。
解决方案
推荐阅读
- heroku - 独立 jar (Heroku) 的部署 - 例外
- jquery - jquery onmouseover 更改 div 与单击相同不起作用
- node.js - Is there a way of stopping a function after a delay? Nodejs
- python - 在python中计算log2时如何实现高精度?
- git - 如何在 gerrit 触发的詹金斯管道作业中获取提交的父 ID
- javascript - javascript 对象不提供带有属性名称的值
- android - 颤振力释放前景
- c# - 从 XML 标签中获取数据
- php - 保存在条目 0 中的数组长度?
- android - 未使用 Actionbar 时未调用 onSupportNavigateUp