首页 > 解决方案 > RabbitMQ,在用节点编写的 pub 子模型中请求被拒绝的消息

问题描述

我正在创建一个简单的聊天应用程序

  1. 可以有多个用户和多个聊天室。
  2. 每个聊天室可以容纳多个用户。
  3. 在服务器启动时建立 rabbitMQ 连接。
  4. 当用户连接到服务器时,会在 rabbitMQ 通道中打开一个套接字连接。
  5. 在从客户端(用户)通过套接字发送消息时,rabbitmq 通道使用特定的路由密钥将其推送到交换。
  6. 在消费时,如果套接字处于活动状态,则通过它发送并确认它。如果不是,请不要确认并重新排队。

{

//Creating a new connection
amqp.connect('amqp://localhost', (err, conn) => {

conn.createChannel(function(err, ch) {

ch.assertExchange(exchangeName, 'direct', { durable: false });
const routingKey = // some routing key;    

wss.on('connection', (ws, req) => {
  // Creating a new queue for the user
  ch.assertQueue('', { exclusive: true, persist: true, durable: true }, (err, q) => {
    // Binds with the routing key
    ch.bindQueue(q.queue, exchangeName, routingKey);

    ch.consume(q.queue, (msg) => {
      if (ws.readyState === 1) {
        ch.ack(msg);
        ws.send(` [-] Received ${msg.content.toString()}`);
      } else {
        // What will come here so that it will be requeued to a new queue with the same routing id?
      }
    }, { noAck: false });
  });

  ws.on('message', (message) => {
    ch.publish(exchangeName, routingKey, new Buffer(message));
    console.log(`[x] Sent ${message} to the exchange with routingKey ${routingKey}`);
  });
}) 

当所有用户都在与服务器连接的套接字中时,这可以正常工作。我想要实现的是,当用户的套接字连接断开时,如果他错过了基于路由键(特定于用户)的消息,那么每当他重新连接到不同的队列时,他应该能够再次接收这些消息。我觉得这是可能的,因为如果没有在任何队列中确认消息,则可以保留消息作为交换。但不确定如何实施。

标签: node.jsrabbitmqmessage-queue

解决方案


让我来解决几个误解......

我想要实现的是,当用户的套接字连接终止时,如果他错过了基于路由键(特定于用户)的消息,那么每当他重新连接到不同的队列时,他应该能够再次接收这些消息

您正在使用独占队列。根据定义,这些队列在其关联的连接和通道消失时被删除。删除队列时,这些队列中的消息将丢失。

我觉得这是可能的,因为如果没有在任何队列中确认消息,则可以保留消息作为交换。

这不是 RabbitMQ 的工作方式。交换用于路由消息,而队列用于存储它们。

以下评论不正确:

如果 user1 和 user2 在不同的主机上,显然他们应该建立不同的通道。“通道”就像连接,如果两个通道使用相同的队列,它们可以接收相同的消息。

在不同的主机上,可能在不同的进程中运行,user1并且user2每个人都会建立自己的连接和通道。两个消费者永远不会从同一个队列中收到相同的消息。

回到你原来的问题,你需要将你的队列声明为非独占和持久的,并以这样的方式命名它们,当你的应用程序重新连接时,应用程序连接到同一个队列。这样,应用程序可以使用保留在队列中的消息。

有多种方法可以让 RabbitMQ 为您删除已记录的未使用队列。


注意: RabbitMQ 团队会监控rabbitmq-users 邮件列表,有时只会在 StackOverflow 上回答问题。


推荐阅读