首页 > 解决方案 > 无法在 .NetCore 中使用 Masstransit 接收消息

问题描述

我正在使用 Masstransit、RabbitMq 和 .NET 核心。我想发布和接收消息时遇到问题。我UserEventHandlerUserCommandHandler班级从不打电话。有谁能够帮我?巴士启动。当我从注册中删除课程时,我也遇到了一种情况,我的断点在课堂UserEventHandler上停止。UserCommandHandler

这是我的UserEventHandler课:

public class UserEventHandler : IConsumer<IUserEvent>
{
    private ConcertDbContext _context;
    public UserEventHandler(ConcertDbContext context)
    {
        _context = context;
    }
    public Task Consume(ConsumeContext<IUserEvent> context)
    {
        var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
        foreach (var concert in concerts)
        {
            concert.SoldTickets = context.Message.TicketBuyed;
            _context.Concerts.Update(concert);
            _context.SaveChanges();
        }
        throw new NotImplementedException();
    }
}

这是我的 UserCommandHandler 类:

public class UserCommandHandler : IConsumer<IUserCommand>
{
    public async Task Consume(ConsumeContext<IUserCommand> context)
    {
        await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
    }
}

我在 Startup.cs 类中注册了 UserCommandHandler:

        services.AddMassTransit(x =>
        {
            x.AddConsumer<ConcertEventHandler>();
            x.AddConsumer<UserCommandHandler>();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
                    hst =>
                    {
                        hst.Username(RabbitMqConstants.Username);
                        hst.Password(RabbitMqConstants.Password);
                    });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<ConcertEventHandler>(provider);
                });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<UserCommandHandler>(provider);
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));

这是我注册的 UserEventHandler 类

                services.AddMassTransit(x =>
        {
            x.AddConsumer<ConcertCommandHandler>();
            x.AddConsumer<UserEventHandler >();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
                    hst =>
                    {
                        hst.Username(RabbitMqConstants.Username);
                        hst.Password(RabbitMqConstants.Password);
                    });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<ConcertCommandHandler>(provider);
                });
                cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<UserEventHandler>(provider);
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));

标签: c#.netrabbitmqmessagingmasstransit

解决方案


每个服务都应该使用单独的队列,目前您正在为每个服务配置具有相同接收端点队列名称的单独消费者。这将导致消息被移动到_skipped队列,因为该服务实例上没有已发布消息的使用者。

此外,删除cfg.ConfigureEndpoints每个服务配置末尾的 。您正在手动配置接收端点,因此它将为您的消费者创建重复的队列和端点。


推荐阅读