c# - MassTransit 消费者测试通过但引发令人困惑的错误
问题描述
我正在尝试使用 MassTransit.Testing 框架和 InMemoryTestHarness 对 MassTransit 消费者进行单元测试。
到目前为止,我能够成功测试是否为两个单独的消费者发送了一条消息。
其中一位消费者也被成功消费,但我收到如下错误消息:
R-FAULT loopback://localhost/vhost/input_queue 49820000-5689-0050-3b5c-08d5ecc4708c Acme.Company.Messages.Commands.ISomeCommand Acme.Company.SomeService.Consumers.SomeCommandConsumer(00:00:00.2328493) 失败:有效载荷未找到:MassTransit.RabbitMqTransport.ModelContext,StackTrace:在 MassTransit.DeferExtensions.Defer[T](ConsumeContext
1 context, TimeSpan delay, Action
2 回调)的 GreenPipes.PipeExtensions.GetPayload[TPayload](PipeContext 上下文)
此时的代码试图将消息延迟一分钟,所以我想知道这是否是缺少有效负载的原因???
代码如下:
[TestFixture]
public class SomeCommandConsumerTests
{
private InMemoryTestHarness _harness;
private Mock<ISomeRepository> _SomeRepositoryMock;
private Mock<IAnotherRepository> _AnotherRepositoryMock;
[OneTimeSetUp]
public async Task OneTimeInit()
{
_harness = new InMemoryTestHarness("vhost");
_harness.Consumer(() => new SomeCommandConsumer(_SomeRepositoryMock.Object, _AnotherRepositoryMock.Object));
await _harness.Start();
}
[SetUp]
public void Init()
{
_SomeRepositoryMock = new Mock<ISomeRepository>();
_AnotherRepositoryMock = new Mock<IAnotherRepository>();
_SomeRepositoryMock.Setup(x => x.UpdateSomeId(It.IsAny<SomeEnum>(), It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(x => x.UpdateProcMessage(It.IsAny<string>(), It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(
x => x.UpdateSomeProcStartTime(It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
_SomeRepositoryMock.Setup(
x => x.UpdateSomeProcEndTime(It.IsAny<int>()))
.Returns(Task.Factory.StartNew(() => { }));
}
[Test]
public async Task ProcessMessage_MethodCalledWithSomeCondition_MessageSent()
{
//Arrange
_SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Entity
{
Property1 = true,
SomeID = 12345
});
await _harness.InputQueueSendEndpoint.Send(new SomeCommand
{
MessageType = MessageTypeEnum.SomeMessgae,
SomeID = 12345
});
//Assert
_harness.Sent.Select<ISomeCommand>().Any().Should().BeTrue();
}
[Test]
public async Task ProcessMessage_MethodCalledWithSomeCondition_CorrectNextStepReturned()
{
//Arrange
_SomeRepositoryMock.Setup(x => x.GetAsync(It.IsAny<int>())).ReturnsAsync(new Control()
{
Property1 = true,
SomeID = 12345
});
await _harness.InputQueueSendEndpoint.Send(new SomeCommand
{
MessageType = MessageTypeEnum.SomeMessgae,
SomeID = 12345
});
//Assert
_harness.Consumed.Select<ISomeCommand>().Any().Should().BeTrue();
_harness.Consumed
.Select<ISomeCommand>()
.First()
.Context
.Message
.SomeID
.Should()
.Be(12345);
_harness.Consumed
.Select<ISomeCommand>()
.First()
.Context
.Message
.MessageProcessingResult
.Should()
.Be(MessageProcessingResult.DeferProcessing);
}
[OneTimeTearDown]
public async Task Teardown()
{
await _harness.Stop();
}
}
在消费者中被击中的代码是:
await context.Defer(TimeSpan.FromMinutes(1));
基本上,我错过了什么,这甚至是一个问题吗?
解决方案
Defer
发生这种情况是因为您正在使用具有RabbitMQ 支持的功能 ( ) 的内存测试工具。Defer 尝试使用来自消费者的 RabbitMQ 模型来延迟消息,但它不存在,因为内存中对此一无所知。
如果您想使用更通用的解决方案,请Redeliver
改用。您需要将 QuartzIntegration 库与内存测试工具一起使用,但它使用该调度程序执行内存消息重新传递。
您还需要更新您的 RabbitMQ 总线配置以包括,cfg.UseDelayedExchangeMessageScheduler();
以便 RabbitMQ 用于消息调度。
推荐阅读
- android - 后端内部错误:Jetpack compose 中 psi2ir 期间出现异常
- python - Keras-Tuner:是否可以在目标/度量函数中使用测试/验证集?
- c# - 使用没有主键的 SQLAdapter 和 SQLCommandBuilder
- react-native - react-native-cn-quill QuillToolbar 触摸没有响应
- flutter - 如何在颤振中创建透明的底部导航栏?尝试了很多方法,但没有任何效果
- python - pygithub提交改进
- api - 使用 Azure AD B2C 的 Xamarin 表单在调用 Web api 时获得 401 Unauthorized
- android - Android - 布局在显示警报对话框时重置 | Alertdialog 重新设置自定义列表视图的视图
- typescript - Vue js 3 - 类型“CreateComponentPublicInstance<{}、{}、{}、{}、{} 上不存在属性“项目”,
- python - 返回字符串无穷次