首页 > 解决方案 > MassTransit Kafka 消费者未被调用

问题描述

所以我正在尝试使用 MassTransit 设置 kafka。

我可以发布一个主题,因为我可以在 conduktor 上看到它: 在此处输入图像描述

问题是当我想用 MassTransit 使用它时,它似乎没有调用我的消费者代码。我已经用 Kafka-Sharp 在其他地方进行了测试,以测试消耗,这似乎有效。

但是对于 MassTransit,似乎某个地方的绑定被破坏了。

也没有发生错误,所以在这一点上我几乎一无所知。

如您所见,它根据 MassTransit 与主题绑定: 在此处输入图像描述

我的工人代码:


    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args)
            .ConfigureServices(services =>
            {
                services.AddClient();

                services.AddMassTransit(x =>
                {
                    x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));

                    x.AddRider(rider =>
                    {
                        rider.AddConsumer<ClientCreatedConsumer>();

                        rider.UsingKafka((context, k) =>
                        {
                            k.Host("localhost:9092");

                            k.TopicEndpoint<Null, IClientCreatedMessage>("IClientCreatedMessage", "test", e =>
                            {
                                e.AutoOffsetReset = AutoOffsetReset.Earliest;
                                e.ConfigureConsumer<ClientCreatedConsumer>(context);
                            });
                        });
                    });
                });
                services.AddMassTransitHostedService(true);
            });
    }

我的消费者代码:

public class ClientCreatedConsumer : IConsumer<IClientCreatedMessage>
    {
        private readonly ILogger<ClientCreatedConsumer> _logger;

        public ClientCreatedConsumer(ILogger<ClientCreatedConsumer> logger)
        {
            _logger = logger;
        }

        public Task Consume(ConsumeContext<IClientCreatedMessage> context)
        {
            _logger.LogInformation("Message Recieved!");
            return Task.CompletedTask;
        }
    }

我的主题界面:

    public interface IClientCreatedMessage
    {
        public int Id { get; set; }
    }

我在制作人所在的 ASPNET Core API 上的启动:

public void ConfigureServices(IServiceCollection services)
        {
            services.AddClient();

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));


                x.AddRider(rider =>
                {
                    rider.AddProducer<IClientCreatedMessage>(nameof(IClientCreatedMessage));

                    rider.UsingKafka((context, k) =>
                    {
                        k.Host("localhost:9092");
                    });
                });
            });
            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo {Title = "Test.Api", Version = "v1"});
            }); 
            
            services.AddMvc();
        }

我调用和生成位于端点调用中的主题的代码:

                var producer = context.RequestServices.GetService<ITopicProducer<IClientCreatedMessage>>();

                producer?.Produce(new
                {
                    Id = 3
                });

                await context.Response.WriteAsync("Client has been created!");

当消息从我的生产者发送时,这就是将消费者放在 Conduktor 上捕获的内容: 在此处输入图像描述

标签: c#apache-kafka.net-5masstransit

解决方案


推荐阅读