首页 > 解决方案 > 过去 5 秒内没有活动或错过太多心跳 [RabbitMQ]

问题描述

我在这里遵循了示例代码:https ://github.com/pika/pika/blob/1.1.0/examples/basic_consumer_threaded.py

但是几个小时后,我的消费者停止了以下回溯。过去 5 秒内没有活动或错过太多心跳(由于以下示例代码,我设置了 5 秒)

Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "main.py", line 44, in do_word
    connection.add_callback_threadsafe(cb)
  File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 744, in add_callback_threadsafe
    'BlockingConnection.add_callback_threadsafe() called on '
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.

Traceback (most recent call last):
  File "main.py", line 66, in <module>
    rabbit.channel.start_consuming()
  File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
    self._process_data_events(time_limit=None)
  File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
    self._flush_output(common_terminator)
  File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.AMQPHeartbeatTimeout: No activity or too many missed heartbeats in the last 5 seconds
Exception in thread Thread-42749:
Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "main.py", line 44, in do_word
    connection.add_callback_threadsafe(cb)
  File "/opt/cdp/at-cdp-product-analysis-consumer/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 744, in add_callback_threadsafe
    'BlockingConnection.add_callback_threadsafe() called on '
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.

这是我的代码:

import json
import config
from app import rabbit, elastic
import functools
import threading


# --------------------------------------------
# Ack message
# --------------------------------------------
def ack_message(channel, delivery_tag):
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        pass


# --------------------------------------------
# Parse message to transactions
# --------------------------------------------
def do_word(connection, channel, delivery_tag, body):

    # Do something

    cb = functools.partial(ack_message, channel, delivery_tag)
    connection.add_callback_threadsafe(cb)


# --------------------------------------------
# Callback function for RabbitMQ Consuming
# --------------------------------------------
def callback(channel, method, properties, body, args):
    connection, threads = args

    t = threading.Thread(target=do_word,
                         args=(connection, channel, method.delivery_tag, body))
    t.start()
    threads.append(t)


if __name__ == "__main__":
    # Consuming config
    threads = []

    # I declare exchange, channel in another python file and call as rabbit object.

    message_callback = functools.partial(callback, args=(rabbit.connection, threads))
    rabbit.channel.basic_consume(queue=config.RABBITMQ_TRANSACTION_QUEUE, on_message_callback=message_callback)

    # Start Consuming
    rabbit.channel.start_consuming()

    # Wait for all to complete
    for thread in threads:
        thread.join()

    rabbit.connection.close()

如果我的心跳配置太低(5 秒)怎么办。我应该禁用心跳吗?

标签: multithreadingrabbitmqpika

解决方案


推荐阅读