python-3.x - 如何使用 RabbitMQ 并行发送多条消息并一次性使用它们?
问题描述
RabbitMQ 的新手。浏览多个站点后,我可以构建以下程序来并行发送多条消息。
发件人.py
import pika
from threading import Thread
from queue import Queue
import multiprocessing
class MetaClass(type):
_instance = {}
def __call__(cls, *args, **kwargs):
"""
Singleton Design pattern
if the instance already exist don't create one!
"""
if cls not in cls._instance:
cls._instance[cls] = super(MetaClass, cls).__call__(*args, **kwargs)
return cls._instance[cls]
class RabbitMQConfigure(metaclass=MetaClass):
def __init__(self, queue='durable_task_queue', host="localhost", routing_key="durable_task_queue", exchange=""):
"""
Configure RabbitMQ server
"""
self.queue = queue
self.host = host
self.routing_key = routing_key
self.exchange = exchange
class RabbitMQ(Thread):
def __init__(self, rabbit_mq_server, queue1):
Thread.__init__(self)
self.rabbit_mq_server = rabbit_mq_server
self.queue1 = queue1
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(self.rabbit_mq_server.host))
self._channel = self._connection.channel()
self._channel.queue_declare(queue=self.rabbit_mq_server.queue, durable=True)
def __enter__(self):
print("__enter__")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("__exit__")
self._connection.close()
def publish(self, message=""):
print("Inside publish method...")
self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange,
routing_key=self.rabbit_mq_server.routing_key, body=message)
print(" [x] Sent %r" % message)
def run(self):
i = 0
while i <= 5:
# Get the work from the queue and expand the tuple
message = self.queue1.get()
print("Inside run method...")
print("Going to call publish method...")
print("Message value:" + message)
self.publish(message=message)
i += 1
if __name__ == "__main__":
rabbit_mq_server = RabbitMQConfigure(queue='durable_task_queue', host="localhost", routing_key='durable_task_queue',
exchange="")
# with RabbitMQ(rabbit_mq_server, message="Hello World!") as rabbitmq:
# rabbitmq.publish()
queue1 = Queue()
no_of_CPUs = multiprocessing.cpu_count()
messages = []
for i in range(5):
messages.append("Hello world1" + str(i))
for x in range(2):
with RabbitMQ(rabbit_mq_server, queue1) as rabbitmq:
# rabbitmq.daemon = True
rabbitmq.start()
# Put the tasks into the queue as a tuple
for message in messages:
queue1.put(message)
# Causes the main thread to wait for the queue to finish processing all the tasks
queue1.join()
但是这个程序总是在不发送任何消息的情况下产生以下输出:
E:\rabbitmq\venv\Scripts\python.exe E:/rabbitmq/work_queues/new_task.py
__enter__
__exit__
__enter__
__exit__ Inside run method...Inside run method... Going to call publish method... Going to call publish method...
Message value:Hello world11Message value:Hello world10 Inside publish method...
Inside publish method... Exception in thread Exception in thread Thread-3: Traceback (most recent call last): File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner Thread-1: Traceback (most recent call last): File "C:\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run() File "E:/rabbitmq/work_queues/new_task.py", line 79, in run self.run() File "E:/rabbitmq/work_queues/new_task.py", line 79, in run
self.publish(message=message) File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
self.publish(message=message) File "E:/rabbitmq/work_queues/new_task.py", line 67, in publish
self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange, File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
self._channel.basic_publish(exchange=self.rabbit_mq_server.exchange, File "E:\rabbitmq\venv\lib\site-packages\pika\adapters\blocking_connection.py", line 2242, in basic_publish
self._impl.basic_publish( self._impl.basic_publish( File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish
File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 421, in basic_publish
self._raise_if_not_open() File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
self._raise_if_not_open() File "E:\rabbitmq\venv\lib\site-packages\pika\channel.py", line 1389, in
_raise_if_not_open
raise exceptions.ChannelWrongStateError('Channel is closed.') pika.exceptions. raise exceptions.ChannelWrongStateError('Channel is closed.') ChannelWrongStateError: Channel is closed.pika.exceptions.ChannelWrongStateError: Channel is closed.
是否可以并行发送多条消息?是否可以一次性使用所有这些消息?
解决方案
队列是相互独立的,对于每个队列中的单个消息也是如此。不过,您可以控制哪个消费者订阅哪个队列,仅此而已。
如果消息是并行发送和消费的,为什么不创建一个包含所有有效负载的大消息呢?
推荐阅读
- ios - 重新加载应用程序后,未设置 Bridge。这可能是因为您在 NativeModule 中显式合成了桥接器
- javascript - Javascript单元测试一个简单的文件
- apache-kafka - 何时使用 Kafka 事务 API?
- c# - 如何在 C# 中正确实例化动态派生对象及其成员
- html - HTML 找不到图片
- c# - 如何在现有 JArray 中添加 JObject?
- android-studio - xml布局不会显示在模拟器上
- google-signin - 构建具有 Google Sign 并支持 Mac 的 iOS 应用
- python - QTreeView 不显示项目
- python - 在 MySQL 表中恢复预期的 UTF-8 欧洲字符