c# - 如何让我的 RabbitMQ 消费者只收到一条消息而不会超时并再次发送?
问题描述
我们有一个问题,如果一条消息处理时间过长,它会从未确认回到就绪状态,然后再次发送。有没有办法防止这种情况?
我们有一个这样初始化的队列消费者类:
private void Initialize()
{
string queueName = _configuration["QueueName"] + _configuration["QueueNameSuffixForSubscriber"];
var factory = CreateConnectionFactory();
var connection = factory.CreateConnection();
_channel = connection.CreateModel();
// The message TTL must match for QueueDeclare() to work.
var arguments = new Dictionary<string, object>();
arguments.Add("x-message-ttl", Convert.ToInt32(_configuration["EventBusMessageTtl"]));
// By default, RabbitMQ dispatches all the messages to the first consumer. You can change this behaviour by
// setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.
_channel.BasicQos(0, 1, false);
_channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerReceived;
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
注意这一行:
_channel.BasicQos(0, 1, false);
这就是我们的消费者一次只提取一条消息的方式。但是,如果该消息在发送 ack 之前花费的时间超过 2 分钟,RMQ 将再次发送该消息。我们不希望那样。(处理时间几乎不会超过 2 分钟,但我们不想满足于差不多.)
有没有办法阻止 RMQ 再次发送消息?我可以在处理消息之前发送 ack ,但是我们会立即收到下一条消息,我们也不希望这样。我们希望在接受下一条消息之前等待消息完成处理。
如果我们可以在准备好时从 RMQ 中提取,那将解决它。
这是ConsumerReceived()
方法:
private void ConsumerReceived(object model, BasicDeliverEventArgs eventArgs)
{
try
{
var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
InvokeHandlers(eventArgs, message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
}
finally
{
_channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
解决方案
I agree that this seems like an ideal flow for polling instead of a consumer subscription. Typically you would not want to poll since it greatly harms throughput, but in your case, that's exactly what you want.
while (true)
{
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null)
{
// No message available at this time.
// Sleep/delay to avoid cpu and I/O churn
Thread.Sleep(2000);
}
else
{
try
{
IBasicProperties props = result.BasicProperties;
var message = Encoding.UTF8.GetString(result.Body.ToArray());
InvokeHandlers(eventArgs, message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
}
finally
{
_channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
}
推荐阅读
- c# - 运行时 C# RLNET/OPENTK System.MissingMethodException
- google-apps-script - 有没有办法使用 Google 幻灯片中的 Google Apps 脚本获取/添加动画到对象?
- python - 比较两个列表中的列表以在python中进行匹配
- c++ - 在makefile中添加头文件及相关修改
- python - PySpark 在 foreachPartition() 自定义函数中访问 DataFrame 列
- .htaccess - 重定向 https 未按预期工作
- scichart - SciChart - 旋转时 3D 系列数据点峰值发生变化
- json - Angular 数据表,dataStream.pipe 不是函数
- ios - iOS 推送通知在添加 Firebase SDK 后停止工作
- javascript - 使用Javascript将变量作为sqlite列名插入?