首页 > 解决方案 > RabbitMQ:Publisher Confirm 中的奇怪行为

问题描述

我是 RabbitMQ 的新手,我使用 Publisher Confirm 来确保消息成功传递。

但是我面临一个奇怪的行为,即当我发布一条消息并获取一个序列号时,函数 channel.BasicAcks 被触发 n 次,其中 n 是 DeliveryTag,因此如果此消息的 DeliveryTag 为 5,则函数 channel.BasicAcks 为开火5次?!!

这是我的代码:

            IModel channel = _customRabbitMQ.GetChannel();
            IBasicProperties properties = _customRabbitMQ.GetBasicProperties();
            ulong sequenceNumber = channel.NextPublishSeqNo;

         
            message.SequenceNumber = sequenceNumber.ToString();
            _context.Messages.Update(message);
            await _context.SaveChangesAsync();

            try
            {
                _customRabbitMQ.AddOutstandingConfirm(sequenceNumber, message.Id);
                channel.BasicPublish(message.ExchangeName, message.RoutingKey, properties, Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)));
                Console.WriteLine("Message Published");
            }
            catch (Exception ex)
            {
                // TODO: log to custom logger here
                Console.WriteLine($"Error => {ex.Message}");
            }

            channel.BasicAcks += async (sender, ea) =>
            {
                try
                {
                    Console.WriteLine("Message Confirmed");

                    using var scope = _provider.CreateScope();
                    var _context = scope.ServiceProvider.GetService<Data.DataContext>();
                    var _customRabbitMQ = scope.ServiceProvider.GetService<CustomRabbitMQ>();

                    Guid messageId = _customRabbitMQ.GetOutstandingConfirm(ea.DeliveryTag);

                    Message message = await _context.Messages.Where(m => m.Id == messageId).FirstOrDefaultAsync();
                    message.Status = MessageStatuses.INQUEUE;
                    _context.Messages.Update(message);
                    await _context.SaveChangesAsync();

                    _customRabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error => {ex.Message}");
                }
            };

            channel.BasicNacks += (sender, ea) =>
            {
                Console.WriteLine("Message Not Confirmed");
                _customRabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
            };
        }

那么,为什么会发生这种情况以及如何阻止它并使其仅确认一次?

提前致谢。

标签: rabbitmqmessage-queuerabbitmqctl

解决方案


我认为问题在于每次发布消息时都会初始化一个侦听器。因此,当您使用 发布消息时DT = 1,您通过添加创建一个侦听器channel.BasicAck += fun,并且在添加另一条消息时DT = 2创建另一个侦听器,因此当 DT_2 上的确认返回时,它将被推送到 2 个侦听器,这解释了您收到的原因具有相同编号的交货标签的确认。


推荐阅读