masstransit - MassTransit 消费失败
问题描述
我正在切换一些使用 MassTransit(.NET 5 上的 v7.2.2)的代码以使用更具声明性的格式(并且远离对 ReceiveEndpoint() 的多次调用),并且理想情况下使用 ConsumerDefinitions 进行配置(尽管不是本示例的一部分为简单起见),以及使用 Quartz.NET 的一些依赖注入(从这个示例中提取,尽管它运行 3.3.3),这样做我现在发现我的消费者没有消费,尽管发送了消息并引用了示例。以 MassTransit 服务为例:
var hostBuilder = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(mt =>
{
mt.AddBus(provider => Bus.Factory.CreateUsingInMemory(cfg =>
{
//cfg.AutoStart = true; //No change when on
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(provider);
}));
mt.AddConsumer<TheMessageConsumer>();
services.AddMediator(cfg =>
{
cfg.AddConsumer<TheMessageConsumer>();
});
});
services.AddMassTransitHostedService();
});
var host = hostBuilder.Build();
var busControl = host.Services.GetService<IBusControl>();
busControl.Start(); //Just in case
var message = new TheMessage() { Message = $"<Message-{DateTime.Now.ToLongTimeString()}>" };
Console.WriteLine($"Sending: {message.Message}");
await busControl.Publish(message);
host.Run();
请注意,此处发送消息的中断是为了简化我的复制,因为在我的完整代码库中,它是由 Quartz 解雇的工作发送的。
对于这个例子,消息和接收者也很简单:
public class TheMessage
{
public string Message { get; set; }
}
public class TheMessageConsumer : IConsumer<TheMessage>
{
public Task Consume(ConsumeContext<TheMessage> context)
{
Console.WriteLine($"Message received: {context.Message.Message}");
return Task.CompletedTask;
}
}
总线已启动,在我明确启动它的情况下,设置了 AutoStart 标志,或者 MassTransitHostedService 启动它,但未收到消息。如果我有完整的例子,Quartz 会在很久以后用消息启动工作。
有人可以建议我缺少什么吗?
解决方案
您发布的代码实际上是一大堆片段,在一起使用时没有任何意义。例如,AddBus
不推荐使用,并且调解员根本没有参与该项目的业务。
我建议使用MassTransit 模板之一从头开始创建一个新项目(您可能需要将 NuGet 版本升级到 7.2.2)。
观看此视频,其中解释了模板以及如何使用它们。
有了您的评论和更新的问题,您实际上只需要以下内容:
var hostBuilder = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(mt =>
{
mt.AddConsumer<TheMessageConsumer>();
mt.UsingInMemory((context,cfg) =>
{
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
});
var host = hostBuilder.Build();
await host.RunAsync();
如果您想对其进行测试,并在同一过程中发送消息,只需添加一个BackgroundService
(after AddMassTransitHostedService
) 即可发布您的消息。
在公共汽车启动之前,您不应该发布。
推荐阅读
- android - 如何在android中自定义进度条
- sql - SQL oracle - 根据多重条件更新一列
- c - 从 x86 汇编器调用 C 函数
- javascript - 如何以编程方式创建 Vue.js 插槽?
- angular - 在角度中,将值分配给嵌套对象
- vaadin8 - Vaadin 8.4.0 Modal for save confirmation after grid buffer save
- forms - 在 symfony 表单中按字母顺序对组进行排序
- selector - Angular5 - 从另一个组件调用时,选择器不会绘制 component.html
- angular - Angular,使用路由器导航时测试哪个组件是实例化的
- ios - PHAsset fetchAssetsWithOptions 第一次没有获取图像objective-c