c# - C#从Rabbit MQ每x间隔以y批量大小消费/读取消息
问题描述
我使用 Rabbit MQ 按顺序处理消息。现在我有一个场景,不需要立即使用消息,但我想每 1(x) 分钟从 MQ 消费/读取消息,批量大小为 20(y)
所以我可以一次处理这 20 条消息,并在一次调用中保存到数据库,而不是为每条消息调用 20 次。
那么如何在每个 x 间隔上批量接收/消费消息。
我已经看到以下信息,批量使用消息 - RabbitMQ和Rabbitmq 使用 .NET 使用单个同步调用检索多条消息
我试图实现第二个问题的实现(questions/32309155)但没有工作,不明白“consumer.Received”将收到 **_fetchSize ==> 20 ** 意味着,它将在单次读取中收到 20 条消息?或者它将如何工作,因为我将 fetchsize 更改为 10,但 consumer.received 正在接收单个消息。
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.BasicQos(0, 1, false);
channel.ExchangeDeclare("helloExchange", type:"direct");
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false,
arguments: null);
channel.QueueBind("hello", "helloExchange", routingKey:"hello");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
bool canAck = false;
var retryCount = 0;
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// DO PROCESS MESSAGE HERE
Console.WriteLine($"{typeof(MyConsumer).Name} Message consumed {message}");
canAck = true;
}
catch (Exception ex)
{canAck = false;
// LOG ERROR
}
try
{
if (canAck)
{
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
channel.BasicNack(ea.DeliveryTag, false, false);
}
}
catch (AlreadyClosedException ex)
{
Console.WriteLine(ex.Message + " >> RabbitMQ is closed!");
}
};
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
解决方案
消费者一次只能收到一条消息。获取大小将决定有多少未确认的消息可以在通道上传递给消费者。
因此,例如,考虑 3 的获取大小,将传递 3 条消息,但如果您不确认任何消息,则意味着即使您在队列中收到新消息(第 4 条),它也不会发送到通道(因此未消费),直到您至少确认 3 条初始消息中的一条。
在您的情况下,为了能够处理您的场景并批量使用消息,您可以执行以下操作:
- 将 fetchSize 设置为 20
- 每当您收到新消息时,您都会保存附加到列表
- 一旦达到限制(20),您就开始处理它并确认批处理中的所有消息(从而清理列表)
- 当批处理完成并确认所有消息后,您将开始接收新消息(然后再次启动流程)
即使这在技术上是可行的,我也不会采用这种实现,因为它会使处理重试/错误处理等变得更加复杂。使用消息队列,您可以选择使您的代码具有原子性、幂等性和弹性,这将导致更容易的错误/重试处理。
推荐阅读
- javascript - 我想在另一个页面上设置 iframe src,并通过链接单击发送 URL 变量
- java - 如何使用 Criteria Builder 编写以下 sql 查询?
- android-studio - Gradle 同步失败:原因:尝试访问方法。将 gradle 2.3 更新到 4.0.1 后
- python - 使用字典 python 3.8 计算每个字符的频率
- vb.net - 使用列表框显示excel文件?
- node.js - 使用 knex.js 在数组中插入多个数据时检查数据是否存在
- python - Tensorflow 在验证集上的准确性非常低
- mule - 关于 Mulesoft 和 DataWeave 的问题
- sql - JPA - 确保没有预留与所需预留发生冲突
- reactjs - react-redux state undefined of props - 功能组件