首页 > 解决方案 > 单个消费者从多个队列中交替读取

问题描述

我是rabbitMQ的新手,我正在尝试制作一个有3个角色的应用程序:两个生产者和一个消费者。消费者与两个队列相关,这两个队列与两个生产者相关。每个生产者以不同的频率将消息发送到队列。我需要的是消费者从两个生产者那里交替阅读。

例如:

生产者 1:每 2 秒发送一次“Hello” 生产者 2:每 5 秒发送一次“World” 消费者:打印它收到的任何内容

所以消费者应该打印:

你好世界你好世界你好世界...

由于生产者 1 发送消息的频率高于生产者 2,因此消费者在读取了消费者 1 的消息后,需要稍等片刻等待生产者 2 的消息到达(这就是问题所在)

我试图为生产者声明两个队列并将它们链接到消费者,但消费者只打印如下内容:

你好你好世界你好你好世界

谢谢您的帮助!

更新:这是我的代码

生产者 1:

import pika
import sys

message = 'hello'


credentials = pika.PlainCredentials('xxxx', 'xxxx)
connection =pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')

while True:
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    print('Sent message: {}'.format(message))
    connection.sleep(2)

connection.close()

生产者 2:

import pika
import sys

message = 'world'


credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()


channel.queue_declare(queue='world')

while True:
    channel.basic_publish(exchange='', routing_key='world', body=message)
    print('Sent message: {}'.format(message))
    connection.sleep(4)

connection.close()

消费者1:

import pika

def callback(ch, method, properties, body):
    print('Receive: {}'.format(body))


credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='world')

channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
channel.basic_consume(on_message_callback=callback, queue='world', auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')


channel.start_consuming()

标签: python-2.7rabbitmq

解决方案


由于消费者只能从单个队列消费,因此您必须确保所有消息都路由到该队列。

然后由消费者来处理消息。它必须使用轮询 API 来获取单个消息。根据发布每条消息的消费者,消费者必须采取不同的行动。它可以保留来自生产者 1 的消息的本地存储,这些消息在来自生产者 2 的消息被执行之前到达。Cosumer 将延迟对它保存在此存储中的消息执行操作,直到对来自生产者 2 的消息执行操作。只有这样,它才会从该商店获取第一条消息并对其采取行动。

编辑:

在您添加到问题中的代码中,您有一个通道(这很好),但有两个消费者,每次调用一个channel.basic_consume. 两个消费者使用相同的回调方法callback。正是这种方法必须实现我上面描述的逻辑。


推荐阅读