首页 > 解决方案 > MassTransit 消费失败

问题描述

我正在切换一些使用 MassTransit(.NET 5 上的 v7.2.2)的代码以使用更具声明性的格式(并且远离对 ReceiveEndpoint() 的多次调用),并且理想情况下使用 ConsumerDefinitions 进行配置(尽管不是本示例的一部分为简单起见),以及使用 Quartz.NET 的一些依赖注入(从这个示例中提取,尽管它运行 3.3.3),这样做我现在发现我的消费者没有消费,尽管发送了消息并引用了示例。以 MassTransit 服务为例:

        var hostBuilder = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddMassTransit(mt =>
                {
                    mt.AddBus(provider => Bus.Factory.CreateUsingInMemory(cfg =>
                    {
                        //cfg.AutoStart = true; //No change when on
                        cfg.UseInMemoryOutbox();
                        cfg.ConfigureEndpoints(provider);
                    }));

                    mt.AddConsumer<TheMessageConsumer>();
                    services.AddMediator(cfg =>
                    {
                        cfg.AddConsumer<TheMessageConsumer>();
                    });
                });

                services.AddMassTransitHostedService();
            });
        var host = hostBuilder.Build();
        var busControl = host.Services.GetService<IBusControl>();

        busControl.Start(); //Just in case

        var message = new TheMessage() { Message = $"<Message-{DateTime.Now.ToLongTimeString()}>" };
        Console.WriteLine($"Sending: {message.Message}");
        await busControl.Publish(message);

        host.Run();

请注意,此处发送消息的中断是为了简化我的复制,因为在我的完整代码库中,它是由 Quartz 解雇的工作发送的。

对于这个例子,消息和接收者也很简单:

    public class TheMessage
    {
        public string Message { get; set; }
    }

    public class TheMessageConsumer : IConsumer<TheMessage>
    {
        public Task Consume(ConsumeContext<TheMessage> context)
        {
            Console.WriteLine($"Message received: {context.Message.Message}");

            return Task.CompletedTask;
        }
    }

总线已启动,在我明确启动它的情况下,设置了 AutoStart 标志,或者 MassTransitHostedService 启动它,但未收到消息。如果我有完整的例子,Quartz 会在很久以后用消息启动工作。

有人可以建议我缺少什么吗?

标签: masstransit

解决方案


您发布的代码实际上是一大堆片段,在一起使用时没有任何意义。例如,AddBus不推荐使用,并且调解员根本没有参与该项目的业务。

我建议使用MassTransit 模板之一从头开始创建一个新项目(您可能需要将 NuGet 版本升级到 7.2.2)。

观看此视频,其中解释了模板以及如何使用它们。

有了您的评论和更新的问题,您实际上只需要以下内容:

var hostBuilder = Host.CreateDefaultBuilder(args)
    .ConfigureServices((hostContext, services) =>
    {
        services.AddMassTransit(mt =>
        {
            mt.AddConsumer<TheMessageConsumer>();

            mt.UsingInMemory((context,cfg) =>
            {
                cfg.UseInMemoryOutbox();
                cfg.ConfigureEndpoints(context);
            }));
        });

        services.AddMassTransitHostedService();
    });
var host = hostBuilder.Build();

await host.RunAsync();

如果您想对其进行测试,并在同一过程中发送消息,只需添加一个BackgroundService(after AddMassTransitHostedService) 即可发布您的消息。

在公共汽车启动之前,您不应该发布。


推荐阅读