首页 > 解决方案 > 使用 RabbitMQ 的客户端 Celery 工作人员的套接字关闭,错过了心跳

问题描述

我创建了 Flask 应用程序服务器,它允许在一个端点上生成 Celery 工作者。第二,您可以将任务应用于连接到工作人员的特定队列。

一切正常,直到工人不使用超过 2 分钟。然后 RabbitMQ 记录消息

rabbit_1         | 2018-12-29 15:51:21.573 [error] <0.30372.0> closing AMQP connection <0.30372.0> (172.18.0.1:50058 -> 172.18.0.4:5672):
rabbit_1         | missed heartbeats from client, timeout: 60s

来自应用程序的堆栈跟踪

> [2018-12-29 16:51:26,526] ERROR in app: Exception on /sentiment [POST]
> Traceback (most recent call last):   File
> "/home/konrad/anaconda3/lib/python3.6/site-packages/flask/app.py",
> line 1612, in full_dispatch_request
>     rv = self.dispatch_request()   File "/home/konrad/anaconda3/lib/python3.6/site-packages/flask/app.py",
> line 1598, in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/flask_restplus/api.py",
> line 325, in wrapper
>     resp = resource(*args, **kwargs)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/flask/views.py",
> line 84, in view
>     return self.dispatch_request(*args, **kwargs)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/flask_restplus/resource.py",
> line 44, in dispatch_request
>     resp = meth(*args, **kwargs)   File "/home/konrad/Documents/ubuntu-docs/python-dev/crypto-ticker-sentiment/sentinet/evaluation_server/routes/sentiment.py",
> line 25, in post
>     prediction = predict_sentiment(queue, text)   File "/home/konrad/Documents/ubuntu-docs/python-dev/crypto-ticker-sentiment/sentinet/evaluation_server/services/evaluation.py",
> line 17, in predict_sentiment
>     task_result = task.get()   File "/home/konrad/anaconda3/lib/python3.6/site-packages/celery/result.py",
> line 224, in get
>     on_message=on_message,   File "/home/konrad/anaconda3/lib/python3.6/site-packages/celery/backends/async.py",
> line 188, in wait_for_pending
>     for _ in self._wait_for_pending(result, **kwargs):   File "/home/konrad/anaconda3/lib/python3.6/site-packages/celery/backends/async.py",
> line 255, in _wait_for_pending
>     on_interval=on_interval):   File "/home/konrad/anaconda3/lib/python3.6/site-packages/celery/backends/async.py",
> line 56, in drain_events_until
>     yield self.wait_for(p, wait, timeout=1)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/celery/backends/async.py",
> line 65, in wait_for
>     wait(timeout=timeout)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/celery/backends/rpc.py",
> line 63, in drain_events
>     return self._connection.drain_events(timeout=timeout)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/kombu/connection.py",
> line 301, in drain_events
>     return self.transport.drain_events(self.connection, **kwargs)   File
> "/home/konrad/anaconda3/lib/python3.6/site-packages/kombu/transport/pyamqp.py",
> line 103, in drain_events
>     return connection.drain_events(**kwargs)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/amqp/connection.py",
> line 491, in drain_events
>     while not self.blocking_read(timeout):   File "/home/konrad/anaconda3/lib/python3.6/site-packages/amqp/connection.py",
> line 496, in blocking_read
>     frame = self.transport.read_frame()   File "/home/konrad/anaconda3/lib/python3.6/site-packages/amqp/transport.py",
> line 243, in read_frame
>     frame_header = read(7, True)   File "/home/konrad/anaconda3/lib/python3.6/site-packages/amqp/transport.py",
> line 426, in _read
>     raise IOError('Socket closed') OSError: Socket closed

我认为这可能与 Celery worker 的心跳或 RabbitMQ 问题有关。我在工人生成时添加了--without-heartbeat选项,但它不起作用。在 RabbitMQ 上禁用心跳会有帮助吗?

更新

我在 RabbitMQ文档页面上找到了它,但没有说明如何实现它。

如何禁用心跳 可以通过将客户端和服务器端的超时间隔设置为 0 来禁用心跳。

或者,可以在两端使用非常高(例如 1800 秒)的值来有效地禁用心跳,因为帧传递太少而无法产生实际影响。

除非使用 TCP keepalives 并具有足够低的不活动检测周期,否则不推荐这两种做法

更新 2

我添加heartbeat = 0rabbitmq.conf文件,它仍然不起作用。

标签: pythonrabbitmqcelery

解决方案


推荐阅读