c# - 为什么当我指定要接收的主题时,我的 RabbitMQ 消费者会接收所有消息?
问题描述
我在队列上有不同的消息类型,我正在尝试创建多个 C# 消费者,每个消费者只从队列中取出某种类型。
显然,我应该能够指定路由键,如此处所示,以获取这些消息:
channel.QueueBind(queue: queueName, exchange: "SandboxEventBus", routingKey: "MessageEvent");
无论我为什么尝试,我都会从队列中获取所有消息,而不仅仅是具有该路由键的消息。我错过了什么?
这是课程:
public static class RabbitReceiver
{
public static void PullFromQueue(string caller, int sleepTimeInMilliseconds)
{
string hostName = "localhost";
string queueName = "Sandbox.v1";
var factory = new ConnectionFactory { HostName = hostName, UserName = "guest", Password = "guest", Port = 6003 };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var arguments = new Dictionary<string, object>();
arguments.Add("x-message-ttl", 864000000); // Not sure why I have to specify this. Got an exception if I didn't.
//arguments.Add("topic", "MessageEvent");
channel.BasicQos(0, 1, false);
// Enabling these lines doesn't change anything. It was part of my effort of trying different things.
//channel.ExchangeDeclare("SandboxEventBus", "topic", durable: true); // topic is case-sensitive
//channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
channel.QueueBind(queue: queueName, exchange: "SandboxEventBus", routingKey: "MessageEvent");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) =>
{
var body = eventArgs.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Message: {message}");
Console.WriteLine($"{caller} - Pausing {sleepTimeInMilliseconds} ms before getting next message (so we can watch the Rabbit dashboard update).");
Thread.Sleep(sleepTimeInMilliseconds);
// This should probably go in a finally block.
channel.BasicAck(eventArgs.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
}
这是发布代码。我只是发布两种不同的消息类型。
public class MessagePublisherService : BackgroundService
{
private readonly ICapPublisher _capBus;
public MessagePublisherService(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// This publishes messages. To get the messages off the queue, run the RabbitMQ project in this solution.
try
{
// To have a different event, add this.
await _capBus.PublishAsync(nameof(SnuhEvent), new SnuhEvent());
Console.WriteLine($"Published message {nameof(SnuhEvent)}.");
for (int i = 1; i <= 10; i++)
{
//_capBus.Publish(nameof(MessageEvent), new MessageEvent(i));
await _capBus.PublishAsync(nameof(MessageEvent), new MessageEvent(i));
Console.WriteLine($"Published message {i}.");
}
}
catch (Exception ex)
{
Console.WriteLine("In catch block of publisher.");
Console.WriteLine(ex);
}
}
}
这是队列。请注意,每种消息类型都显示为不同的路由键。
这是 Startup.cs 中的 RMQ 代码:
public void ConfigureServices(IServiceCollection services)
{
services.AddCap(x =>
{
x.UseSqlServer("Server=localhost; Database=Sandbox; Integrated Security=true;");
x.DefaultGroup = "Sandbox";
x.FailedRetryCount = 3;
x.FailedRetryInterval = 2; // CAP will invoke the subscriber again if not finished in N seconds.
x.UseRabbitMQ(conf =>
{
conf.ExchangeName = "SandboxEventBus";
conf.ConnectionFactoryOptions = x =>
{
x.Uri = BuildRabbitUri();
};
});
});
services.AddHostedService<MessagePublisherService>();
}
public static Uri BuildRabbitUri()
{
string protocol = "amqp";
string userName = "guest";
string password = "guest";
string host = "localhost";
string port = "6003";
return new Uri(protocol + "://" + userName + ":" + password + "@" + host + ":" + port);
}
解决方案
推荐阅读
- python-3.6 - mlflow.exceptions.RestException:RESOURCE_DOES_NOT_EXIST:不存在 id=0 的实验
- php - 服务器上不同的 Google api 结果
- android - 升级应用内购买订阅android的问题(从服务器检索信息时出错。DF-DFERH-01)
- spring-boot - 使用 Spring Boot 将 xml 发送到活动 MQ 中的队列
- python - 将关键字与 PDF 文件进行比较
- vue.js - Vue:是否可以操作通过插槽传递的数据
- python - 使用 WSGI 在 IIS 上托管 FastAPI 时出现问题
- tensorflow - GPU停止在谷歌云虚拟机上工作的原因是什么?
- c# - 用几行替换行,用 OpenXml 替换 Word
- python - 在 django 中使用多线程冻结站点