.net-core - MassTransit AzureServieBus InvalidOperationException
问题描述
我在使用 AzureServiceBus 时遇到问题。有时(它随机发生)当命令或事件被抛出到总线时,我得到这个异常(下图)并且命令/事件没有被触发,所以我错过了它。
此外,有时异常表示该服务取消了某些主题的订阅。
有人发生过吗?
启动配置:
services.AddScoped<IHostedService, MassTransitHostedService>();
//Create Autofac ContainerBuilder
var containerBuilder = new ContainerBuilder();
containerBuilder.Populate(services);
// register ApplicationModule (contains all modules for application)
containerBuilder.RegisterModule(new ApplicationModule(Configuration));
return new AutofacServiceProvider(containerBuilder.Build());
总线配置:
protected override void Load(ContainerBuilder builder)
{
builder.Register(y => new ConsumerAndDeliver(y.Resolve<IIncomingMessageDeliver>())).As<ConsumerAndDeliver>().SingleInstance();
builder.Register(c =>
{
IComponentContext ctx = c.Resolve<IComponentContext>();
var consumer = ctx.Resolve<ConsumerAndDeliver>();
var bus = CreateBusInAzureServiceBusToRegister(consumer);
return bus;
})
.As<IBusControl>()
.As<IPublishEndpoint>()
.As<ISendEndpointProvider>()
.SingleInstance();
RegisterQueuesForCommands();
}
private IBusControl CreateBusInAzureServiceBusToRegister(object consumer)
{
var bus = Bus.Factory.CreateUsingAzureServiceBus(sbc =>
{
var host = sbc.Host(_configuration["myConnectionString"], h =>
{ });
sbc.ReceiveEndpoint(host, _configuration["myQueueName"], e =>
{
e.Instance(consumer);
});
});
return bus;
}
HostedService 用于管理总线生命周期
public class MassTransitHostedService : Microsoft.Extensions.Hosting.IHostedService
{
private readonly IBusControl _busControl;
public MassTransitHostedService(IBusControl busControl)
{
_busControl = busControl;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _busControl.StartAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _busControl.StopAsync(TimeSpan.FromSeconds(10));
}
}
发送和发布方法:
private readonly ISendEndpointProvider _commandSendEndpoint;
private readonly IPublishEndpoint _eventPublishEndpoint;
private async Task DispatchMessageToBus<T>(T message, CancellationToken cancellationToken = default)
{
dynamic msg = message;
switch (message)
{
case IOutcomingCommand outcomingCommand:
var type = outcomingCommand.GetType();
await _commandSendEndpoint.Send(outcomingCommand, type, cancellationToken);
break;
case IOutcomingEvent _:
await _eventPublishEndpoint.Publish(msg, cancellationToken);
break;
default:
throw new ArgumentException("Dispatch message must be IOutcomingCommand or IOutcomingEvent");
}
}
解决方案
推荐阅读
- angular - Angular 9 - 服务返回/列表包含数据但 *ngFor 没有显示任何内容
- python - OpenCV Python:FloodFill 表现得很奇怪
- html - 我有一个没有项目符号的无序列表,动画消失在屏幕外。但是,最后三个字母不动
- algorithm - 查找所有 Coprime 子集,直到一个数字 N
- javascript - 如何在反应组件中传递redux状态
- java - 解决这个编程挑战的最佳算法是什么?给定一个仅由数字 6 和 9 组成的正整数
- javascript - 仅更新 JavaScript 中的最新数组
- java - 如何使用 Chrome 配置文件无头运行 Webdriver
- amazon-web-services - 如果未找到文件,则禁用 Cloudfront 缓存
- typescript - 如何从离子中的承诺中返回价值