首页 > 解决方案 > 使用 Azure 服务总线主题时从 _error 队列中移动消息

问题描述

我正在尝试使用 Azure 服务总线主题来允许服务使用 MassTransit 使用消息。一条消息发布到一个主题,每个订阅该主题的服务都会收到该消息的副本。每个消息类型都有一个主题。

问题是一旦解决了异常的来源,如何处理错误。一旦消息出现故障并最终进入 _error 队列,我想在消息和/或服务修复后将消息移回以进行处理。我无法将消息从 _error 队列移动到主题,因为该主题上的每个服务都会再次收到消息。

我尝试使用名为 _errorecovery 的 ReceiveEndpoint 方法创建第二个队列,但这样做会导致队列订阅该主题,这意味着 _errorrecovery 队列会获取发布到该主题的每条消息。

我想知道是否有一种方法可以使用 MassTransit 设置队列,该队列仅处理该队列中的消息而不添加额外的订阅。

这是我当前构建主题的设置。

TEvent 是消息类型,而 TConsumer 是该消息类型的关联 IConsumer 实现。

  public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
        {
            string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
            var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));

            busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
            {
                configurator.ConfigureConsumer(container, typeof(TConsumer));
                if (!(options is null))
                {
                    ConfigureRetry(configurator, options);
                }
            });
        }

并建立 _errorrecovery 队列。每个事件还有一个关联的 IConsumer,专门用于处理失败的事件。

 var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);

                        busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
                        {
                            config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
                        });

这将生成一个名为 subname_errorrecovery 的队列和一个以事件命名的主题。该服务在主题中有订阅,但 _errorrecovery 也是如此。因此,每次向 Topic 发送消息时,事件的消费者和错误的消费者都会收到消息。

因此,我正在寻找一种将服务连接到恢复队列以及多个主题的方法,而该队列也不会订阅每个主题。

我也很有可能以错误的方式处理这个问题。也许我只需要添加对重复消息的检查并将消息移动到主题,允许最初成功处理消息的服务简单地忽略它。这是我无论如何都计划做的事情,但我希望获得一些关于 MassTransit 错误处理的指导。

任何帮助,将不胜感激。

标签: azureservicebusmasstransitazure-servicebus-queuesazure-servicebus-topics

解决方案


此问题的开箱即用解决方案是添加主题订阅规则。因此,每当您要重新提交消息时,它只会根据订阅上应用的过滤器进入那些订阅。通过此处的文章了解主题订阅规则。


推荐阅读