首页 > 解决方案 > 当我有两个消费者线程运行时,为什么只有一个 RabbitMQ 消费者接收所有消息?

问题描述

我正在运行测试并试图让两个 RabbitMQ 消费者共享从队列中提取的消息。但是,每次我运行它时,只有一个订阅者/消费者处理所有消息。为什么?

我有一个单独的进程,将 10 条示例消息放入队列。我停止它,然后运行它。

class Program
{
    static void Main(string[] args)
    {
        Task task1 = Task.Factory.StartNew(() => RabbitReceiver.PullFromQueue(2000));
        Task task2 = Task.Factory.StartNew(() => RabbitReceiver.PullFromQueue(1000));
        Task.WaitAll(task1, task2);

        Console.WriteLine("Press any key to close.");
        Console.ReadKey();
    }
}

这是每个线程使用的消费者:

public static class RabbitReceiver
{
    public static void PullFromQueue(int sleepTimeInMilliseconds)
    {
        string hostName = "localhost";
        string queueName = "Sandbox.v1";

        var factory = new ConnectionFactory { HostName = hostName, UserName = "guest", Password = "guest", Port = 6003 };

        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", 864000000); // Not sure why I have to specify this. Got an exception if I didn't.

        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, eventArgs) =>
        {
            var body = eventArgs.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Thread.Sleep(sleepTimeInMilliseconds);

            // This should probably go in a finally block.
            channel.BasicAck(eventArgs.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }
}

标签: c#multithreadingrabbitmq

解决方案


如果BasicQos未设置,则会发生这种情况。默认情况下,当消息进入队列时,RabbitMQ 会将所有消息分派给第一个消费者。它不查看消费者未确认消息的数量。

您可以通过设置 来更改此行为BasicQos,这控制消费者在确认之前可以接收的消息数量。

channel.BasicQos(0, 1, false);

参考:https ://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html


推荐阅读