首页 > 解决方案 > 多路复用 RabbitMQ 消息

问题描述

例如,我有 4 个发布计量的来源。我想在一个队列/交换中多路复用/合并所有这些消息

--------+----+----+----+----+        -------+---------+----+---------+---------+
Source1 | M1 | M2 | M3 |    |   =>   Result | M1 | M4 | M2 | M3 | M6 | M5 | M7 |
Source2 | M4 |    |    | M5 |
Source3 |    |    | M6 |    |
Source4 |    |    |    | M7 |
For each queue:
 * Read one message
 * Publish message to the Result queue

在 RabbitMQ 中是否有“本机”方式来执行此操作,或者我应该编写自己的 Consumer/Publisher 吗?

编辑 1

一些例子来澄清,让我们说一段时间后我有

     Processing "window"
        +-+
Source1 |X|XXXXXXXXXXXXX
Source2 |Y|YYYYYYY
Source3 |Z|ZZZZZZZZZZ
Source4 |W|WW
        +-+

然后后来

     Processing "window"
           +-+
Source1 XXX|X|XXXXXXXXXX
Source2 YYY|Y|YYYY
Source3 ZZZ|Z|ZZZZZZZ
Source4 WWW| |
           +-+

然后后来

     Processing "window"
                 +-+
Source1 XXXXXXXXX|X|XXXX
Source2 YYYYYYYY | |
Source3 ZZZZZZZZZ|Z|Z
Source4 WWW      | |
                 +-+

结果消费顺序将是: X Y Z W X Y Z W X Y Z W X Y Z X Y Z X Y Z X Y Z X Y Z X Z X Z X Z X X X

X,Y,Z,W 然后 X,Y,Z,W 然后 X,Y,Z,W 然后 X,Y,Z ... X,Z ...

这样,即使一个来源正在“发送垃圾邮件”,来自其他来源的所有其他消息也有机会被消费。

出于技术/财务原因,我一次只需要使用 1 条消息。

消费者比生产者慢得多,但生产者发布了很多但偶尔。

如果每个源发布到绑定到同一队列的交换,结果可能是XXXXXXXXXXXXXX YYYYYYYY ZZZZZZZZZZZ WWWXXXXX Y XXXXX YYY XXX YYYY ZZZZZZZZZZZ WWW(取决于每个源的发布率)

标签: rabbitmq

解决方案


我认为只需运行一个订阅所有队列的脚本即可实现您想要的。

关键要求是使用单个应用程序线程来处理所有消息,而不管它们来自哪个队列。看起来会根据您使用的语言和客户端库而有所不同 - 如果您使用的是 PHP,那么您必须竭尽全力不要成为单线程,但也许有一些客户端库假设每个回调都在一个单独的工作线程上,并且您需要一些共享资源来阻止它们。

就实际的 RabbitMQ 方面而言,您需要:

  • 为服务器注册订阅以向您推送消息,使用basic.consume; basic.get无论如何,这通常建议使用显式轮询
  • basic.consume对所有呼叫使用单个“通道”
  • 使用手动确认,以便消息保留在队列中,直到您的过程完成
  • 每个队列的预取限制设置为 1basic.qos

如果您有 4 个队列 A、B、C 和 D,当您启动消费者时,它们具有不同数量的消息:

  1. 当你第一次订阅时,预取限制意味着每个队列中的一条消息将被发送到通道;称它们为 A1、B1、C1 和 D1
  2. 客户端库将依次在您的应用程序中为其中的每一个引发一个异步事件
  3. 您的单个工作线程将处理这些事件中的第一个,并开始处理消息 A1
  4. 在您手动确认该消息之前,没有其他消息可以到达
  5. 确认第一条消息 (A1) 后,可以从该队列 (A2) 中预取一条新消息
  6. 同时,您的工作线程将解除阻塞并处理已经引发的下一个事件,用于消息 B1
  7. 只有处理完 B1、C1 和 D1 的待处理事件后,工作线程才会看到消息 A2 的事件
  8. 只要队列有消息等待,它们就会以循环方式处理。即使除了一个队列之外的所有队列都变空了,一旦消息到达,它们就会重新进入轮换状态,因为只有来自繁忙队列的一条消息会被预取,其余的只会在 RabbitMQ 服务器上等待。

推荐阅读