c# - MassTransit Kafka 消费者未被调用
问题描述
所以我正在尝试使用 MassTransit 设置 kafka。
我可以发布一个主题,因为我可以在 conduktor 上看到它:
问题是当我想用 MassTransit 使用它时,它似乎没有调用我的消费者代码。我已经用 Kafka-Sharp 在其他地方进行了测试,以测试消耗,这似乎有效。
但是对于 MassTransit,似乎某个地方的绑定被破坏了。
也没有发生错误,所以在这一点上我几乎一无所知。
我的工人代码:
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
private static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddClient();
services.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddConsumer<ClientCreatedConsumer>();
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
k.TopicEndpoint<Null, IClientCreatedMessage>("IClientCreatedMessage", "test", e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer<ClientCreatedConsumer>(context);
});
});
});
});
services.AddMassTransitHostedService(true);
});
}
我的消费者代码:
public class ClientCreatedConsumer : IConsumer<IClientCreatedMessage>
{
private readonly ILogger<ClientCreatedConsumer> _logger;
public ClientCreatedConsumer(ILogger<ClientCreatedConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<IClientCreatedMessage> context)
{
_logger.LogInformation("Message Recieved!");
return Task.CompletedTask;
}
}
我的主题界面:
public interface IClientCreatedMessage
{
public int Id { get; set; }
}
我在制作人所在的 ASPNET Core API 上的启动:
public void ConfigureServices(IServiceCollection services)
{
services.AddClient();
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
x.AddRider(rider =>
{
rider.AddProducer<IClientCreatedMessage>(nameof(IClientCreatedMessage));
rider.UsingKafka((context, k) =>
{
k.Host("localhost:9092");
});
});
});
services.AddMassTransitHostedService();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo {Title = "Test.Api", Version = "v1"});
});
services.AddMvc();
}
我调用和生成位于端点调用中的主题的代码:
var producer = context.RequestServices.GetService<ITopicProducer<IClientCreatedMessage>>();
producer?.Produce(new
{
Id = 3
});
await context.Response.WriteAsync("Client has been created!");
解决方案
推荐阅读
- java - 在 LeanFT 中最大化浏览器窗口
- python - 无法在 Windows 中更改默认 python 版本
- push-notification - Service Worker 中的推送支持是否依赖于 Internet 和第三方 Google/Apple 服务?
- python - 在 digitalocean Ubuntu 16.04 中安装新版本的 Python 后无法运行 Django 控制台
- jenkins - 在 Windows 的 ubuntu 子系统上运行 jenkins
- r - 打印长代码块R markdown pdf
- javafx - 导出到 jar 文件时,JavaFX 应用程序未加载 fxml 文件
- asp.net - 有没有办法调用删除 asmx 文件的 web 服务?
- openstack-neutron - 404:PecanNotFound OpenStack Neutron
- mysql - R,闪亮,应用程序前的弹出窗口