c# - 无法在 .NetCore 中使用 Masstransit 接收消息
问题描述
我正在使用 Masstransit、RabbitMq 和 .NET 核心。我想发布和接收消息时遇到问题。我UserEventHandler
和UserCommandHandler
班级从不打电话。有谁能够帮我?巴士启动。当我从注册中删除课程时,我也遇到了一种情况,我的断点在课堂UserEventHandler
上停止。UserCommandHandler
这是我的UserEventHandler
课:
public class UserEventHandler : IConsumer<IUserEvent>
{
private ConcertDbContext _context;
public UserEventHandler(ConcertDbContext context)
{
_context = context;
}
public Task Consume(ConsumeContext<IUserEvent> context)
{
var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
foreach (var concert in concerts)
{
concert.SoldTickets = context.Message.TicketBuyed;
_context.Concerts.Update(concert);
_context.SaveChanges();
}
throw new NotImplementedException();
}
}
这是我的 UserCommandHandler 类:
public class UserCommandHandler : IConsumer<IUserCommand>
{
public async Task Consume(ConsumeContext<IUserCommand> context)
{
await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
}
}
我在 Startup.cs 类中注册了 UserCommandHandler:
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertEventHandler>();
x.AddConsumer<UserCommandHandler>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertEventHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserCommandHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
这是我注册的 UserEventHandler 类
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertCommandHandler>();
x.AddConsumer<UserEventHandler >();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertCommandHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserEventHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
解决方案
每个服务都应该使用单独的队列,目前您正在为每个服务配置具有相同接收端点队列名称的单独消费者。这将导致消息被移动到_skipped队列,因为该服务实例上没有已发布消息的使用者。
此外,删除cfg.ConfigureEndpoints
每个服务配置末尾的 。您正在手动配置接收端点,因此它将为您的消费者创建重复的队列和端点。
推荐阅读
- json - json编组中的额外括号
- javascript - 从 React 模板制作嵌套路由时难以使用 useRouteMatch
- python - 如何从与画面相关的网络调用中获取 POST url 规范
- json - 使用 Retrofit 和 Moshi 处理来自 Api 的不一致类型
- apache-flink - 源操作符卡在 requestBufferBuilderBlocking 中
- java - 如何从 2 个整数中获得下一个最接近的整数?
- java - 如何在 JavaFX 项目中实现 RichTextFX
- python - 初学者:Python--如何将 elif 语句嵌入到 while 循环中(每次通过循环都会改变输出的路径?!)
- python - KeyError:from_pandas_edge_list() 的“来源”
- node.js - 如何使用 DateTimePicker 在 android 中显示选定的日期和时间