.net-core - 未处理异常上的 MessageLockLostException 与 Azure 服务总线上 MassTransit 的 ConfigureDeadLetterQueueErrorTransport 结合使用
问题描述
我正在尝试使用 Azure 服务总线设置 MassTransit。我想使用DLQ而不是依赖_skipped
和_error
队列,但我不确定我是否做错了。
这是我的设置的简化版本:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<MessageConsumer>();
busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
{
serviceBusBusFactoryConfigurator.Host(connectionString);
serviceBusBusFactoryConfigurator.SubscriptionEndpoint<Message>(
"message-subscription",
configurator =>
{
configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();
configurator.ConfigureConsumer<MessageConsumer>(context);
});
});
});
services.AddMassTransitHostedService(true);
---
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer : IConsumer<Message>
{
public Task Consume(ConsumeContext<Message> context)
{
throw new Exception("Hello");
}
}
发布消息 ( await _bus.Publish<Message>(new {})
) 时,我在日志中得到如下信息:
[15:19:22 | DBG | MassTransit] Create send transport: sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message--
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- ()
[15:19:22 | DBG | MassTransit] Topic: MassTransit/Fault ()
[15:19:22 | DBG | MassTransit] Subscription Fault-MassTransit (MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- -> sb://mytestbus.servicebus.windows.net/MassTransit/Fault)
[15:19:22 | DBG | MassTransit] SEND sb://mytestbus.servicebus.windows.net/MassTransit/Fault--Acme.Common.Messaging.Contracts.Events/Message-- 09c40000-5dcc-0015-85cf-08d9988339b8 MassTransit.Fault<Acme.Common.Messaging.Contracts.Events.Message>
[15:19:22 | ERR | MassTransit] R-FAULT sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription 09c40000-5dcc-0015-5c52-08d998833786 Acme.Common.Messaging.Contracts.Events.Message Acme.Services.UpdateService.MessageConsumer(00:00:03.0423556)
System.Exception: Hello
at Acme.Services.UpdateService.MessageConsumer.Consume(ConsumeContext`1 context) in C:\Development\acme\src\Acme.Services.UpdateService\Startup.cs:line 503
at MassTransit.Pipeline.Filters.MethodConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumerConsumeContext<TConsumer,TMessage>>.Send(ConsumerConsumeContext`2 context, IPipe`1 next)
at GreenPipes.Pipes.LastPipe`1.Send(TContext context)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
[15:19:23 | WRN | MassTransit] Message Lock Lost: 09c400005dcc00155c5208d998833786
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
[15:19:23 | WRN | MassTransit] Exception on Receiver sb://mytestbus.servicebus.windows.net/Acme.Common.Messaging.Contracts.Events/Message/Subscriptions/message-subscription during UserCallback ActiveDispatchCount(0) ErrorRequiresRecycle(False)
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DeadLetterAsync(String lockToken, IDictionary`2 propertiesToModify)
at MassTransit.Azure.ServiceBus.Core.Contexts.ReceiverClientMessageLockContext.DeadLetter(Exception exception)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.DeadLetterQueueExceptionFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.GenerateFaultFilter.GreenPipes.IFilter<MassTransit.ExceptionReceiveContext>.Send(ExceptionReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
at MassTransit.Azure.ServiceBus.Core.Contexts.SubscriptionClientContext.<>c__DisplayClass16_0.<<OnMessageAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessageDispatchTask(Message message)
这是设计使然还是我做错了什么?我原以为消息会被移动到 DLQ,而没有任何记录的警告(除了我抛出的异常)。
如果我取消注释ConfigureDeadLetterQueueErrorTransport
,那么我将不再得到MessageLockLostException
.
使用大众运输7.2.3
解决方案
对于订阅端点,DLQ 是默认行为。
您配置的内容可能与默认配置冲突。如果您摆脱以下行,它应该可以按预期工作:
configurator.ConfigureDeadLetterQueueErrorTransport();
configurator.ConfigureDeadLetterQueueDeadLetterTransport();
您可以看到已经为订阅端点配置了这些相同的行。
仅需要这些行来配置接收端点以使用 DLQ。
推荐阅读
- swift - 如何构建具有多种单元格类型的表格视图?
- cpu-architecture - 尝试使用 D-FlipFlop 构建 JK-FlipFlop。我如何处理数字电子问题?
- ios - 如何快速读取父 ViewController 的值
- mysql - 如何设计一个表来有效地将可能是新的或与前一天相同的每日数据保存到 MySQL 中?
- javascript - TypeError:element.attr 不是函数 Jasmine
- python - 深度学习模型在非常少的数据上进行训练
- python - 为什么我与 repo.anacoda.com 的连接失败?
- python - “lambda_function”:没有名为“ask_sdk_core”的模块
- java - mvn 命令用于更新依赖项中的版本和一个使用该依赖项的项目
- javascript - Discord.js 机器人随机回合制战斗