首页 > 解决方案 > 如何让我的 RabbitMQ 消费者只收到一条消息而不会超时并再次发送?

问题描述

我们有一个问题,如果一条消息处理时间过长,它会从未确认回到就绪状态,然后再次发送。有没有办法防止这种情况?

我们有一个这样初始化的队列消费者类:

    private void Initialize()
    {
        string queueName = _configuration["QueueName"] + _configuration["QueueNameSuffixForSubscriber"];

        var factory = CreateConnectionFactory();

        var connection = factory.CreateConnection();
        _channel = connection.CreateModel();

        // The message TTL must match for QueueDeclare() to work.
        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", Convert.ToInt32(_configuration["EventBusMessageTtl"]));

        // By default, RabbitMQ dispatches all the messages to the first consumer. You can change this behaviour by
        // setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.
        _channel.BasicQos(0, 1, false);

        _channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += ConsumerReceived;

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

注意这一行:

_channel.BasicQos(0, 1, false);

这就是我们的消费者一次只提取一条消息的方式。但是,如果该消息在发送 ack 之前花费的时间超过 2 分钟,RMQ 将再次发送该消息。我们不希望那样。(处理时间几乎不会超过 2 分钟,但我们不想满足于差不多.)

有没有办法阻止 RMQ 再次发送消息?我可以在处理消息之前发送 ack ,但是我们会立即收到下一条消息,我们也不希望这样。我们希望在接受下一条消息之前等待消息完成处理。

如果我们可以在准备好时从 RMQ 中提取,那将解决它。

这是ConsumerReceived()方法:

    private void ConsumerReceived(object model, BasicDeliverEventArgs eventArgs)
    {
        try
        {
            var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
            InvokeHandlers(eventArgs, message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
        }
        finally
        {
            _channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }

标签: c#rabbitmq

解决方案


I agree that this seems like an ideal flow for polling instead of a consumer subscription. Typically you would not want to poll since it greatly harms throughput, but in your case, that's exactly what you want.

while (true)
{
    BasicGetResult result = channel.BasicGet(queueName, noAck);
    if (result == null) 
    {
        // No message available at this time.
        // Sleep/delay to avoid cpu and I/O churn
        Thread.Sleep(2000);
    } 
    else 
    {
        try 
        {
            IBasicProperties props = result.BasicProperties;
            var message = Encoding.UTF8.GetString(result.Body.ToArray());
            InvokeHandlers(eventArgs, message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
        }
        finally
        {
            _channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }
}

推荐阅读