python-2.7 - 单个消费者从多个队列中交替读取
问题描述
我是rabbitMQ的新手,我正在尝试制作一个有3个角色的应用程序:两个生产者和一个消费者。消费者与两个队列相关,这两个队列与两个生产者相关。每个生产者以不同的频率将消息发送到队列。我需要的是消费者从两个生产者那里交替阅读。
例如:
生产者 1:每 2 秒发送一次“Hello” 生产者 2:每 5 秒发送一次“World” 消费者:打印它收到的任何内容
所以消费者应该打印:
你好世界你好世界你好世界...
由于生产者 1 发送消息的频率高于生产者 2,因此消费者在读取了消费者 1 的消息后,需要稍等片刻等待生产者 2 的消息到达(这就是问题所在)
我试图为生产者声明两个队列并将它们链接到消费者,但消费者只打印如下内容:
你好你好世界你好你好世界
谢谢您的帮助!
更新:这是我的代码
生产者 1:
import pika
import sys
message = 'hello'
credentials = pika.PlainCredentials('xxxx', 'xxxx)
connection =pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
while True:
channel.basic_publish(exchange='', routing_key='hello', body=message)
print('Sent message: {}'.format(message))
connection.sleep(2)
connection.close()
生产者 2:
import pika
import sys
message = 'world'
credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='world')
while True:
channel.basic_publish(exchange='', routing_key='world', body=message)
print('Sent message: {}'.format(message))
connection.sleep(4)
connection.close()
消费者1:
import pika
def callback(ch, method, properties, body):
print('Receive: {}'.format(body))
credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='world')
channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
channel.basic_consume(on_message_callback=callback, queue='world', auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
解决方案
由于消费者只能从单个队列消费,因此您必须确保所有消息都路由到该队列。
然后由消费者来处理消息。它必须使用轮询 API 来获取单个消息。根据发布每条消息的消费者,消费者必须采取不同的行动。它可以保留来自生产者 1 的消息的本地存储,这些消息在来自生产者 2 的消息被执行之前到达。Cosumer 将延迟对它保存在此存储中的消息执行操作,直到对来自生产者 2 的消息执行操作。只有这样,它才会从该商店获取第一条消息并对其采取行动。
编辑:
在您添加到问题中的代码中,您有一个通道(这很好),但有两个消费者,每次调用一个channel.basic_consume
. 两个消费者使用相同的回调方法callback
。正是这种方法必须实现我上面描述的逻辑。
推荐阅读
- c++ - OpenGL视频帧拟合
- html - 如何增加图像轮播的不透明度?
- bash - 从具有 700 万行的输入文本文件中获取包含空格的最后一列的 md5 哈希
- operating-system - 主引导记录显示没有活动分区?
- php - ACF:创建自定义字段类型存储两个值
- java - 无限循环混乱中的Java拆分方法
- markdown - 分数转换为 sup 和 sub 元素
- amazon-web-services - ping 重定向端点后面的端点
- java - 从 double 到 float 的 Narrowing 转换的管理规则是什么?
- facebook-opengraph - Whatsapp 使用什么用户代理来抓取网站?