rabbitmq - 多路复用 RabbitMQ 消息
问题描述
例如,我有 4 个发布计量的来源。我想在一个队列/交换中多路复用/合并所有这些消息
--------+----+----+----+----+ -------+---------+----+---------+---------+
Source1 | M1 | M2 | M3 | | => Result | M1 | M4 | M2 | M3 | M6 | M5 | M7 |
Source2 | M4 | | | M5 |
Source3 | | | M6 | |
Source4 | | | | M7 |
For each queue:
* Read one message
* Publish message to the Result queue
在 RabbitMQ 中是否有“本机”方式来执行此操作,或者我应该编写自己的 Consumer/Publisher 吗?
编辑 1
一些例子来澄清,让我们说一段时间后我有
Processing "window"
+-+
Source1 |X|XXXXXXXXXXXXX
Source2 |Y|YYYYYYY
Source3 |Z|ZZZZZZZZZZ
Source4 |W|WW
+-+
然后后来
Processing "window"
+-+
Source1 XXX|X|XXXXXXXXXX
Source2 YYY|Y|YYYY
Source3 ZZZ|Z|ZZZZZZZ
Source4 WWW| |
+-+
然后后来
Processing "window"
+-+
Source1 XXXXXXXXX|X|XXXX
Source2 YYYYYYYY | |
Source3 ZZZZZZZZZ|Z|Z
Source4 WWW | |
+-+
结果消费顺序将是:
X Y Z W X Y Z W X Y Z W X Y Z X Y Z X Y Z X Y Z X Y Z X Z X Z X Z X X X
X,Y,Z,W 然后 X,Y,Z,W 然后 X,Y,Z,W 然后 X,Y,Z ... X,Z ...
这样,即使一个来源正在“发送垃圾邮件”,来自其他来源的所有其他消息也有机会被消费。
出于技术/财务原因,我一次只需要使用 1 条消息。
消费者比生产者慢得多,但生产者发布了很多但偶尔。
如果每个源发布到绑定到同一队列的交换,结果可能是XXXXXXXXXXXXXX YYYYYYYY ZZZZZZZZZZZ WWW
或
XXXXX Y XXXXX YYY XXX YYYY ZZZZZZZZZZZ WWW
(取决于每个源的发布率)
解决方案
我认为只需运行一个订阅所有队列的脚本即可实现您想要的。
关键要求是使用单个应用程序线程来处理所有消息,而不管它们来自哪个队列。看起来会根据您使用的语言和客户端库而有所不同 - 如果您使用的是 PHP,那么您必须竭尽全力不要成为单线程,但也许有一些客户端库假设每个回调都在一个单独的工作线程上,并且您需要一些共享资源来阻止它们。
就实际的 RabbitMQ 方面而言,您需要:
- 为服务器注册订阅以向您推送消息,使用
basic.consume
;basic.get
无论如何,这通常建议使用显式轮询 basic.consume
对所有呼叫使用单个“通道”- 使用手动确认,以便消息保留在队列中,直到您的过程完成
- 将每个队列的预取限制设置为 1
basic.qos
如果您有 4 个队列 A、B、C 和 D,当您启动消费者时,它们具有不同数量的消息:
- 当你第一次订阅时,预取限制意味着每个队列中的一条消息将被发送到通道;称它们为 A1、B1、C1 和 D1
- 客户端库将依次在您的应用程序中为其中的每一个引发一个异步事件
- 您的单个工作线程将处理这些事件中的第一个,并开始处理消息 A1
- 在您手动确认该消息之前,没有其他消息可以到达
- 确认第一条消息 (A1) 后,可以从该队列 (A2) 中预取一条新消息
- 同时,您的工作线程将解除阻塞并处理已经引发的下一个事件,用于消息 B1
- 只有处理完 B1、C1 和 D1 的待处理事件后,工作线程才会看到消息 A2 的事件
- 只要队列有消息等待,它们就会以循环方式处理。即使除了一个队列之外的所有队列都变空了,一旦消息到达,它们就会重新进入轮换状态,因为只有来自繁忙队列的一条消息会被预取,其余的只会在 RabbitMQ 服务器上等待。
推荐阅读
- android - Wi-fi 未连接到 Android Studio 模拟器
- amazon-web-services - 资源 'data.template_file.user_data' 没有属性 'rendered'
- java - 在 SpringBoot 中提供 Developer 和 Production URL Mail 链接
- jquery - 使用带有 a-frame 的 jquery 接收未定义的错误
- python - Tkinter 按钮链接到带参数的函数给出 TypeError
- java - 无法在另一台 PC 上运行可执行 JAR 文件
- php - 在我将 php 版本从 7.1 切换到 7.2 或 7.2 到 7.1 之前,WebServer 无法工作
- amazon-web-services - 如何在一个 API Gateway REST API 中为多个自定义授权者编写策略?
- python - Qthread 上的 QTimer
- java - Mockito - 如何避免将数据插入数据库?