首页 > 解决方案 > C#从Rabbit MQ每x间隔以y批量大小消费/读取消息

问题描述

我使用 Rabbit MQ 按顺序处理消息。现在我有一个场景,不需要立即使用消息,但我想每 1(x) 分钟从 MQ 消费/读取消息,批量大小为 20(y)

所以我可以一次处理这 20 条消息,并在一次调用中保存到数据库,而不是为每条消息调用 20 次。

那么如何在每个 x 间隔上批量接收/消费消息。

我已经看到以下信息,批量使用消息 - RabbitMQRabbitmq 使用 .NET 使用单个同步调用检索多条消息

我试图实现第二个问题的实现(questions/32309155)但没有工作,不明白“consumer.Received”将收到 **_fetchSize ==> 20 ** 意味着,它将在单次读取中收到 20 条消息?或者它将如何工作,因为我将 fetchsize 更改为 10,但 consumer.received 正在接收单个消息。

using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.BasicQos(0, 1, false);
                channel.ExchangeDeclare("helloExchange", type:"direct");
                channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false,
                    arguments: null);

                channel.QueueBind("hello", "helloExchange", routingKey:"hello");

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    bool canAck = false;
                    var retryCount = 0;



                    try
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                       // DO PROCESS MESSAGE HERE
                        Console.WriteLine($"{typeof(MyConsumer).Name} Message consumed {message}");
                        canAck = true;
                    }
                    catch (Exception ex)
                    {canAck = false;
                      // LOG ERROR
                    }

                    try
                    {
                        if (canAck)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            channel.BasicNack(ea.DeliveryTag, false, false);
                        }
                    }
                    catch (AlreadyClosedException ex)
                    {
                        Console.WriteLine(ex.Message + " >> RabbitMQ is closed!");
                    }
                };
                channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

标签: c#rabbitmq

解决方案


消费者一次只能收到一条消息。获取大小将决定有多少未确认的消息可以在通道上传递给消费者。

因此,例如,考虑 3 的获取大小,将传递 3 条消息,但如果您不确认任何消息,则意味着即使您在队列中收到新消息(第 4 条),它也不会发送到通道(因此未消费),直到您至少确认 3 条初始消息中的一条。

在您的情况下,为了能够处理您的场景并批量使用消息,您可以执行以下操作:

  • 将 fetchSize 设置为 20
  • 每当您收到新消息时,您都会保存附加到列表
  • 一旦达到限制(20),您就开始处理它并确认批处理中的所有消息(从而清理列表)
  • 当批处理完成并确认所有消息后,您将开始接收新消息(然后再次启动流程)

即使这在技术上是可行的,我也不会采用这种实现,因为它会使处理重试/错误处理等变得更加复杂。使用消息队列,您可以选择使您的代码具有原子性、幂等性和弹性,这将导致更容易的错误/重试处理。


推荐阅读