c# - 当我有两个消费者线程运行时,为什么只有一个 RabbitMQ 消费者接收所有消息?
问题描述
我正在运行测试并试图让两个 RabbitMQ 消费者共享从队列中提取的消息。但是,每次我运行它时,只有一个订阅者/消费者处理所有消息。为什么?
我有一个单独的进程,将 10 条示例消息放入队列。我停止它,然后运行它。
class Program
{
static void Main(string[] args)
{
Task task1 = Task.Factory.StartNew(() => RabbitReceiver.PullFromQueue(2000));
Task task2 = Task.Factory.StartNew(() => RabbitReceiver.PullFromQueue(1000));
Task.WaitAll(task1, task2);
Console.WriteLine("Press any key to close.");
Console.ReadKey();
}
}
这是每个线程使用的消费者:
public static class RabbitReceiver
{
public static void PullFromQueue(int sleepTimeInMilliseconds)
{
string hostName = "localhost";
string queueName = "Sandbox.v1";
var factory = new ConnectionFactory { HostName = hostName, UserName = "guest", Password = "guest", Port = 6003 };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var arguments = new Dictionary<string, object>();
arguments.Add("x-message-ttl", 864000000); // Not sure why I have to specify this. Got an exception if I didn't.
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) =>
{
var body = eventArgs.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Thread.Sleep(sleepTimeInMilliseconds);
// This should probably go in a finally block.
channel.BasicAck(eventArgs.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
}
解决方案
如果BasicQos
未设置,则会发生这种情况。默认情况下,当消息进入队列时,RabbitMQ 会将所有消息分派给第一个消费者。它不查看消费者未确认消息的数量。
您可以通过设置 来更改此行为BasicQos
,这控制消费者在确认之前可以接收的消息数量。
channel.BasicQos(0, 1, false);
参考:https ://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
推荐阅读
- docker - 如何安装命令行实用程序以在 puckel docker-airflow docker 容器中使用
- r - 如何在 R 中的表中将标准误差低于估计值,并转移到 flextable
- django - 如何使用 Django 创建系统托盘弹出消息?(视窗)
- pyspark - 将两个列数组合并为配置单元中的 1 个数组列
- ios - 如何让某些 UITextview 包含在标题可访问性转子中?
- android - 使用谷歌登录时将用户重定向到主屏幕
- bash - if-then 循环子目录,如果文件中的行数超过 2
- javascript - 未使用“messageReactionAdd”事件定义消息以执行操作
- html - 如何将设置为 A4 的 html 垂直居中
- javascript - 基于动态条件过滤的最佳方法