首页 > 解决方案 > RabbitMQ - Handle consumer side if routing key not found

问题描述

I have code which consumes message from RabbitMQ. As you can see there is logger on switch/default. Because of this, message just disappears if routing key does not match any of routing keys. How can I acknowledge RabbitMQ if no routing key matches ? It is unacceptable message to disappear. I tried to throw exception on "default" but I think it is not the best way of handling.

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace MyApp.API.RabbitMQ
{
    public class ConsumeRabbitMQHostedService : BackgroundService
    {
        private readonly ILogger logger;

        public ConsumeRabbitMQHostedService(ILoggerFactory loggerFactory)
        {
            logger = loggerFactory.CreateLogger<ConsumeRabbitMQHostedService>();
            InitRabbitMQ();
        }

        private void InitRabbitMQ()
        {
            ConnectionFactory factory = new ConnectionFactory
            {
                // config files
            };

            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            channel.QueueDeclare("my.queue", false, false, false, null);
            channel.BasicQos(0, 1, false);

            connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            stoppingToken.ThrowIfCancellationRequested();

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ch, ea) =>
            {
                string content = Encoding.UTF8.GetString(ea.Body.ToArray());
                HandleMessage(content, ea.RoutingKey);
                channel.BasicAck(ea.DeliveryTag, false);
            };

            consumer.Shutdown += OnConsumerShutdown;
            consumer.Registered += OnConsumerRegistered;
            consumer.Unregistered += OnConsumerUnregistered;
            consumer.ConsumerCancelled += OnConsumerConsumerCancelled;

            channel.BasicConsume("my.queue", false, consumer);
            return Task.CompletedTask;
        }

        private void HandleMessage(string content, string routingKey)
        {
            logger.LogInformation($"Consumer received {content}");
            switch (routingKey)
            {
                case "specific_routing_key":
                    // some code
                    break;
                default:
                    logger.LogInformation($"{routingKey} did not match any of existing routing keys");
                    break;
            }
        }

        private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
        private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
        private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
        private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
        private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

        public override void Dispose()
        {
            channel.Close();
            connection.Close();
            base.Dispose();
        }
    }
}

标签: c#asp.net-corerabbitmq

解决方案


您需要使用该mandatory标志,请参阅:

https://www.rabbitmq.com/dotnet-api-guide.html#basic-return

如果一条消息发布时设置了“强制”标志,但无法传递,代理会将其返回给发送客户端(通过 basic.return AMQP 0-9-1 命令)。

要收到此类返回的通知,客户端可以订阅 IModel.BasicReturn 事件。如果没有附加到事件的侦听器,则返回的消息将被静默丢弃。

model.BasicReturn += new RabbitMQ.Client.Events.BasicReturnEventHandler(...); BasicReturn 事件将触发,例如,如果客户端发布一条消息,其中“强制”标志设置为未绑定到队列的“直接”类型的交换。


推荐阅读