首页 > 解决方案 > 使其他模块可以访问rabbitMQ连接

问题描述

RabbitMQ 最佳实践建议使用长寿命连接,理想情况下将使用和发布连接分开,并将每个线程的通道附加到相应的连接。我正在构建一个分布式系统,其中每个部分都需要使用并将消息发布到系统的其他部分。RabbitMQ 类创建这些连接,将通道附加到它们,并发布消息。另一方面,我有大约 10 个进程,每个进程都在一个线程中,必须通过其“自己的”通道消费/发布。在启动时,每个进程都会创建其通道并绑定其队列。

我的问题是如何启动 RabbitMQ 类的唯一实例,使两个连接“可访问”进程,保持这两个连接处于活动状态并避免打开/关闭通道。我尝试在每个模块中导入消息传递,但是对于每个导入,都有一个类的实例化,因此有两个新的连接。我还尝试向 RabbitMQ 类添加一个单例,以避免对导入进行多次实例化,但没有奏效。

我感谢您的帮助。

消息传递.py

class RabbitMQ:

    def __init__(self):
        self.consume_connection = None
        self.publish_connection = None
        self.initialize_connection()

    def initialize_connection(self):
        self.consume_connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost', socket_timeout=5, client_properties={'connection_name': 'consume_connection'}))
        self.publish_connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost', socket_timeout=5, client_properties={'connection_name': 'publish_connection'}))

    def send_message(self, exchange_name, routing_key, message, channel):
        ...

    def create_consume_channel(self):
        ...

    def create_publish_channel(self):
        ...


Messaging = RabbitMQ()

消费进程.py

...
def connect_messaging(self):
    channel = self.messaging.create_consume_channel() # <-- messaging would be the instance of class RabbitMQ
    channel.basic_qos(prefetch_count=100)

    exchange_name = 'abc'
    channel.exchange_declare(exchange=exchange_name, exchange_type='direct')

    result = channel.queue_declare(queue='queue_name')
    queue_1 = result.method.queue
    channel.queue_bind(exchange=exchange_name, queue=queue_1, routing_key='some_routing_key')
    ...    

    def callback_function(ch, method, properties, body):
        ...
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(callback_function, queue=queue_1, no_ack=False)
    channel.start_consuming()

标签: pythonrabbitmqconnectioninstance

解决方案


推荐阅读