首页 > 解决方案 > MassTransit AzureServieBus InvalidOperationException

问题描述

我在使用 AzureServiceBus 时遇到问题。有时(它随机发生)当命令或事件被抛出到总线时,我得到这个异常(下图)并且命令/事件没有被触发,所以我错过了它。

此外,有时异常表示该服务取消了某些主题的订阅。

有人发生过吗?

异常日志

启动配置:

 services.AddScoped<IHostedService, MassTransitHostedService>();

 //Create Autofac ContainerBuilder 
        var containerBuilder = new ContainerBuilder();
        containerBuilder.Populate(services);

        // register ApplicationModule (contains all modules for application)
        containerBuilder.RegisterModule(new ApplicationModule(Configuration));

        return new AutofacServiceProvider(containerBuilder.Build());

总线配置:

   protected override void Load(ContainerBuilder builder)
            {
    builder.Register(y => new ConsumerAndDeliver(y.Resolve<IIncomingMessageDeliver>())).As<ConsumerAndDeliver>().SingleInstance();

                builder.Register(c =>
                {
                    IComponentContext ctx = c.Resolve<IComponentContext>();
                    var consumer = ctx.Resolve<ConsumerAndDeliver>();
                    var bus = CreateBusInAzureServiceBusToRegister(consumer);
                    return bus;
                })
                    .As<IBusControl>()
                    .As<IPublishEndpoint>()
                    .As<ISendEndpointProvider>()
                    .SingleInstance();

                RegisterQueuesForCommands();
            }


  private IBusControl CreateBusInAzureServiceBusToRegister(object consumer)
    {
        var bus = Bus.Factory.CreateUsingAzureServiceBus(sbc =>
        {
            var host = sbc.Host(_configuration["myConnectionString"], h =>
                { });
            sbc.ReceiveEndpoint(host, _configuration["myQueueName"], e =>
            {
                e.Instance(consumer);
            });
        });
        return bus;
    }

HostedService 用于管理总线生命周期

public class MassTransitHostedService : Microsoft.Extensions.Hosting.IHostedService
    {
        private readonly IBusControl _busControl;

        public MassTransitHostedService(IBusControl busControl)
        {
            _busControl = busControl;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await _busControl.StartAsync(cancellationToken);
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await _busControl.StopAsync(TimeSpan.FromSeconds(10));
        }
    }

发送和发布方法:

private readonly ISendEndpointProvider _commandSendEndpoint;
    private readonly IPublishEndpoint _eventPublishEndpoint;

private async Task DispatchMessageToBus<T>(T message, CancellationToken cancellationToken = default)
{
    dynamic msg = message;
    switch (message)
    {
        case IOutcomingCommand outcomingCommand:
            var type = outcomingCommand.GetType();
            await _commandSendEndpoint.Send(outcomingCommand, type, cancellationToken);
            break;
        case IOutcomingEvent _:
            await _eventPublishEndpoint.Publish(msg, cancellationToken);
            break;
        default:
            throw new ArgumentException("Dispatch message must be IOutcomingCommand or IOutcomingEvent");
    }
}

标签: .net-coreazureservicebusmasstransit

解决方案


推荐阅读