首页 > 解决方案 > 为什么当我指定要接收的主题时,我的 RabbitMQ 消费者会接收所有消息?

问题描述

我在队列上有不同的消息类型,我正在尝试创建多个 C# 消费者,每个消费者只从队列中取出某种类型。

显然,我应该能够指定路由键,如此处所示,以获取这些消息:

channel.QueueBind(queue: queueName, exchange: "SandboxEventBus", routingKey: "MessageEvent");

无论我为什么尝试,我都会从队列中获取所有消息,而不仅仅是具有该路由键的消息。我错过了什么?

这是课程:

public static class RabbitReceiver
{
    public static void PullFromQueue(string caller, int sleepTimeInMilliseconds)
    {
        string hostName = "localhost";
        string queueName = "Sandbox.v1";

        var factory = new ConnectionFactory { HostName = hostName, UserName = "guest", Password = "guest", Port = 6003 };

        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", 864000000); // Not sure why I have to specify this. Got an exception if I didn't.
        //arguments.Add("topic", "MessageEvent");

        channel.BasicQos(0, 1, false);

        // Enabling these lines doesn't change anything. It was part of my effort of trying different things.
        //channel.ExchangeDeclare("SandboxEventBus", "topic", durable: true); // topic is case-sensitive
        //channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
        channel.QueueBind(queue: queueName, exchange: "SandboxEventBus", routingKey: "MessageEvent");

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, eventArgs) =>
        {
            var body = eventArgs.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine($"Message: {message}");
            Console.WriteLine($"{caller} - Pausing {sleepTimeInMilliseconds} ms before getting next message (so we can watch the Rabbit dashboard update).");
            Thread.Sleep(sleepTimeInMilliseconds);

            // This should probably go in a finally block.
            channel.BasicAck(eventArgs.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }
}

这是发布代码。我只是发布两种不同的消息类型。

public class MessagePublisherService : BackgroundService
{
    private readonly ICapPublisher _capBus;

    public MessagePublisherService(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // This publishes messages. To get the messages off the queue, run the RabbitMQ project in this solution.

        try
        {
            // To have a different event, add this.
            await _capBus.PublishAsync(nameof(SnuhEvent), new SnuhEvent());
            Console.WriteLine($"Published message {nameof(SnuhEvent)}.");

            for (int i = 1; i <= 10; i++)
            {
                //_capBus.Publish(nameof(MessageEvent), new MessageEvent(i));
                await _capBus.PublishAsync(nameof(MessageEvent), new MessageEvent(i));
                Console.WriteLine($"Published message {i}.");
            }                
        }
        catch (Exception ex)
        {
            Console.WriteLine("In catch block of publisher.");
            Console.WriteLine(ex);
        }
    }
}

这是队列。请注意,每种消息类型都显示为不同的路由键。

在此处输入图像描述

这是 Startup.cs 中的 RMQ 代码:

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddCap(x =>
        {
            x.UseSqlServer("Server=localhost; Database=Sandbox; Integrated Security=true;");

            x.DefaultGroup = "Sandbox";
            x.FailedRetryCount = 3;
            x.FailedRetryInterval = 2; // CAP will invoke the subscriber again if not finished in N seconds.

            x.UseRabbitMQ(conf =>
            {
                conf.ExchangeName = "SandboxEventBus";
                conf.ConnectionFactoryOptions = x =>
                {
                    x.Uri = BuildRabbitUri();
                };
            });
        });

        services.AddHostedService<MessagePublisherService>();
    }

    public static Uri BuildRabbitUri()
    {
        string protocol = "amqp";
        string userName = "guest";
        string password = "guest";
        string host = "localhost";
        string port = "6003";

        return new Uri(protocol + "://" + userName + ":" + password + "@" + host + ":" + port);
    }

标签: c#rabbitmq

解决方案


推荐阅读