首页 > 解决方案 > 如何使用 RabbitMQ 并行发送多条消息并一次性使用它们?

问题描述

RabbitMQ 的新手。浏览多个站点后,我可以构建以下程序来并行发送多条消息。

发件人.py

import pika
from threading import Thread
from queue import Queue
import multiprocessing


class MetaClass(type):
    _instance = {}

    def __call__(cls, *args, **kwargs):
        """
        Singleton Design pattern
        if the instance already exist don't create one!
        """
        if cls not in cls._instance:
            cls._instance[cls] = super(MetaClass, cls).__call__(*args, **kwargs)
            return cls._instance[cls]


class RabbitMQConfigure(metaclass=MetaClass):
    def __init__(self, queue='durable_task_queue', host="localhost", routing_key="durable_task_queue", exchange=""):
        """
        Configure RabbitMQ server
        """
        self.queue = queue
        self.host = host
        self.routing_key = routing_key
        self.exchange = exchange


class RabbitMQ(Thread):
    def __init__(self, rabbit_mq_server, queue1):
        Thread.__init__(self)
        self.rabbit_mq_server = rabbit_mq_server
        self.queue1 = queue1

        self._connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.rabbit_mq_server.host))
        self._channel = self._connection.channel()
        self._channel.queue_declare(queue=self.rabbit_mq_server.queue, durable=True)

    def __enter__(self):
        print("__enter__")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("__exit__")
        self._connection.close()

    def publish(self, message=""):
        print("Inside publish method...")
        self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,
                                    routing_key=self.rabbit_mq_server.routing_key, body=message)
        print(" [x] Sent %r" % message)

    def run(self):
        i = 0
        while i <= 5:
            # Get the work from the queue and expand the tuple
            message = self.queue1.get()
            print("Inside run method...")
            print("Going to call publish method...")
            print("Message value:" + message)
            self.publish(message=message)
            i += 1


if __name__ == "__main__":
    rabbit_mq_server = RabbitMQConfigure(queue='durable_task_queue', host="localhost", routing_key='durable_task_queue',
                                         exchange="")
    # with RabbitMQ(rabbit_mq_server, message="Hello World!") as rabbitmq:
    #     rabbitmq.publish()

    queue1 = Queue()
    no_of_CPUs = multiprocessing.cpu_count()

    messages = []
    for i in range(5):
        messages.append("Hello world1" + str(i))

    for x in range(2):
        with RabbitMQ(rabbit_mq_server, queue1) as rabbitmq:
            # rabbitmq.daemon = True
            rabbitmq.start()

    # Put the tasks into the queue as a tuple
    for message in messages:
        queue1.put(message)
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue1.join()

但是这个程序总是在不发送任何消息的情况下产生以下输出:

E:\rabbitmq\venv\Scripts\python.exe E:/rabbitmq/work_queues/new_task.py
__enter__
__exit__
__enter__
__exit__ Inside run method...Inside run method... Going to call publish method... Going to call publish method...

Message value:Hello world11Message value:Hello world10 Inside publish method...

Inside publish method... Exception in thread Exception in thread Thread-3: Traceback (most recent call last):   File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner Thread-1: Traceback (most recent call last):   File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner
        self.run()   File "E:/rabbitmq/work_queues/new_task.py", line 79, in run self.run()   File "E:/rabbitmq/work_queues/new_task.py", line 79, in run
    self.publish(message=message)   File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
    self.publish(message=message)   File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
    self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,   File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
    self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,   File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
        self._impl.basic_publish( self._impl.basic_publish(  File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish

  File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish
    self._raise_if_not_open()   File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
    self._raise_if_not_open()   File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
    raise exceptions.ChannelWrongStateError('Channel is closed.') pika.exceptions.    raise exceptions.ChannelWrongStateError('Channel is closed.') ChannelWrongStateError: Channel is closed.pika.exceptions.ChannelWrongStateError:  Channel is closed. 

是否可以并行发送多条消息?是否可以一次性使用所有这些消息?

标签: python-3.xrabbitmqpika

解决方案


队列是相互独立的,对于每个队列中的单个消息也是如此。不过,您可以控制哪个消费者订阅哪个队列,仅此而已。

如果消息是并行发送和消费的,为什么不创建一个包含所有有效负载的大消息呢?


推荐阅读