为什么当我指定要接收的主题时,我的 RabbitMQ 消费者会接收所有消息?


我在队列上有不同的消息类型,我正在尝试创建多个 C# 消费者,每个消费者只从队列中取出某种类型。


channel.QueueBind(queue: queueName, exchange: "SandboxEventBus", routingKey: "MessageEvent");



public static class RabbitReceiver
    public static void PullFromQueue(string caller, int sleepTimeInMilliseconds)
        string hostName = "localhost";
        string queueName = "Sandbox.v1";

        var factory = new ConnectionFactory { HostName = hostName, UserName = "guest", Password = "guest", Port = 6003 };

        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", 864000000); // Not sure why I have to specify this. Got an exception if I didn't.
        //arguments.Add("topic", "MessageEvent");

        channel.BasicQos(0, 1, false);

        // Enabling these lines doesn't change anything. It was part of my effort of trying different things.
        //channel.ExchangeDeclare("SandboxEventBus", "topic", durable: true); // topic is case-sensitive
        //channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
        channel.QueueBind(queue: queueName, exchange: "SandboxEventBus", routingKey: "MessageEvent");

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, eventArgs) =>
            var body = eventArgs.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine($"Message: {message}");
            Console.WriteLine($"{caller} - Pausing {sleepTimeInMilliseconds} ms before getting next message (so we can watch the Rabbit dashboard update).");

            // This should probably go in a finally block.
            channel.BasicAck(eventArgs.DeliveryTag, false);

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);


public class MessagePublisherService : BackgroundService
    private readonly ICapPublisher _capBus;

    public MessagePublisherService(ICapPublisher capPublisher)
        _capBus = capPublisher;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        // This publishes messages. To get the messages off the queue, run the RabbitMQ project in this solution.

            // To have a different event, add this.
            await _capBus.PublishAsync(nameof(SnuhEvent), new SnuhEvent());
            Console.WriteLine($"Published message {nameof(SnuhEvent)}.");

            for (int i = 1; i <= 10; i++)
                //_capBus.Publish(nameof(MessageEvent), new MessageEvent(i));
                await _capBus.PublishAsync(nameof(MessageEvent), new MessageEvent(i));
                Console.WriteLine($"Published message {i}.");
        catch (Exception ex)
            Console.WriteLine("In catch block of publisher.");



这是 Startup.cs 中的 RMQ 代码:

    public void ConfigureServices(IServiceCollection services)
        services.AddCap(x =>
            x.UseSqlServer("Server=localhost; Database=Sandbox; Integrated Security=true;");

            x.DefaultGroup = "Sandbox";
            x.FailedRetryCount = 3;
            x.FailedRetryInterval = 2; // CAP will invoke the subscriber again if not finished in N seconds.

            x.UseRabbitMQ(conf =>
                conf.ExchangeName = "SandboxEventBus";
                conf.ConnectionFactoryOptions = x =>
                    x.Uri = BuildRabbitUri();


    public static Uri BuildRabbitUri()
        string protocol = "amqp";
        string userName = "guest";
        string password = "guest";
        string host = "localhost";
        string port = "6003";

        return new Uri(protocol + "://" + userName + ":" + password + "@" + host + ":" + port);

标签: c#rabbitmq

