multithreading - 过去 5 秒内没有活动或错过太多心跳 [RabbitMQ]
问题描述
我在这里遵循了示例代码:https ://github.com/pika/pika/blob/1.1.0/examples/basic_consumer_threaded.py
但是几个小时后,我的消费者停止了以下回溯。过去 5 秒内没有活动或错过太多心跳(由于以下示例代码,我设置了 5 秒)
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "main.py", line 44, in do_word
connection.add_callback_threadsafe(cb)
File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 744, in add_callback_threadsafe
'BlockingConnection.add_callback_threadsafe() called on '
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.
Traceback (most recent call last):
File "main.py", line 66, in <module>
rabbit.channel.start_consuming()
File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
self._process_data_events(time_limit=None)
File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
self._flush_output(common_terminator)
File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
raise self._closed_result.value.error
pika.exceptions.AMQPHeartbeatTimeout: No activity or too many missed heartbeats in the last 5 seconds
Exception in thread Thread-42749:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "main.py", line 44, in do_word
connection.add_callback_threadsafe(cb)
File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 744, in add_callback_threadsafe
'BlockingConnection.add_callback_threadsafe() called on '
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.
这是我的代码:
import json
import config
from app import rabbit, elastic
import functools
import threading
# --------------------------------------------
# Ack message
# --------------------------------------------
def ack_message(channel, delivery_tag):
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
pass
# --------------------------------------------
# Parse message to transactions
# --------------------------------------------
def do_word(connection, channel, delivery_tag, body):
# Do something
cb = functools.partial(ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)
# --------------------------------------------
# Callback function for RabbitMQ Consuming
# --------------------------------------------
def callback(channel, method, properties, body, args):
connection, threads = args
t = threading.Thread(target=do_word,
args=(connection, channel, method.delivery_tag, body))
t.start()
threads.append(t)
if __name__ == "__main__":
# Consuming config
threads = []
# I declare exchange, channel in another python file and call as rabbit object.
message_callback = functools.partial(callback, args=(rabbit.connection, threads))
rabbit.channel.basic_consume(queue=config.RABBITMQ_TRANSACTION_QUEUE, on_message_callback=message_callback)
# Start Consuming
rabbit.channel.start_consuming()
# Wait for all to complete
for thread in threads:
thread.join()
rabbit.connection.close()
如果我的心跳配置太低(5 秒)怎么办。我应该禁用心跳吗?
- 鼠兔:1.1.0
解决方案
推荐阅读
- python - 来自 Django 的开发服务器端口绑定错误的 ptvsd 远程调试
- r - 如何在 R Studio 中从数据表中绘制选定的行
- python - 使用 sshtunnel 模块的 SSH 隧道不允许空密码
- grails - 如何更改 grails 项目中的 URL?
- javascript - 试图避免在ajaxcall之后滚动条返回开始
- r - 优化例程在自动编码器包中的自动编码功能中给出错误
- c# - 将 Office 互操作图像附件转换为 OpenXML
- java - Apache CXF JAXB Marshaller 未正确编组 @XmlAttribute
- dialogflow-es - 如何从 WebHook 客户端检索登录认证结果?
- javascript - 在使用 webpack 转译 typescript 代码之前包含 XMLHttpRequest