首页 > 解决方案 > 使用来自一个 rabbitmq 主机的消息并使用公共传输和 .net 核心发布到不同的 rabbitmq 主机

问题描述

使用不同的 rabbitmq 主机发布/使用消息时,我似乎遇到了问题。最初,pub / sub 发生在一台rabbitmq 主机上。决定创建另一台主机(位于不同的服务器上),从那以后我遇到了一些问题。我已经阅读了多总线文档(https://masstransit-project.com/usage/containers/multibus.html)并尝试实现它,以便我可以拥有一条用于生成消息的总线和一条用于消费消息的总线。请在下面找到代码:

--- 启动.cs ---

services.AddMassTransit(x =>
{
    var section = configuration.GetSection("Rabbit1");

    x.AddConsumer<SomeConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
           e.ConfigureConsumer<SomeConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransit<ISecondBus>(x =>
{
    var section = configuration.GetSection("Rabbit2");

    x.AddConsumer<SomeOtherConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
            e.PrefetchCount = 4;
            e.UseMessageRetry(r => r.Interval(10, 100));

            e.ConfigureConsumer<SomeOtherConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransitHostedService();          

--- ISecondBus.cs ---

public interface ISecondBus : IBus
{
}

--- 消费者.cs ---

public class SomeConsumer : IConsumer<MessageObjectList>
{
   private readonly IBus _bus;

   public SomeConsumer(IBus bus)
   {
     _bus = bus;
   }

   public async Task Consume(ConsumeContext<MessageObjectList> context)
   {
       var message = context.Message;

       //Aggregate data

       var newList = new MessageObjectList
                    {
                        Message = message
                    };

      var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
      await endpoint.Send(newList);
   }
}

上面的代码工作正常,成功地消费了来自 rabbit1 的消息。但是,当向 Rabbit2 发送消息时,我可以看到正在创建队列,但没有数据被持久化到队列中。

请参阅下面的第二个消费者:

--- 启动.cs ---

services.AddMassTransit(x =>
{
    var section = configuration.GetSection("Rabbitmq2");

    x.AddConsumer<SomeOtherConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
            e.ConfigureConsumer<SomeOtherConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransitHostedService();

--- SomeOtherConsumer.cs ---

public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
     public async Task Consume(ConsumeContext<MessageObjectList> context)
     {
        //Persist data to db
     }
}

这里的问题是 rabbit2 的队列永远不会被填充(但它确实被创建)所以没有消息从第二个消费者消费。

任何援助将不胜感激。

标签: .net-corerabbitmqmasstransit

解决方案


如果要从第一条总线发送到第二条总线,则需要在第一条总线的消费者中使用第二条总线接口。

public class SomeConsumer : IConsumer<MessageObjectList>
{
   private readonly ISecondBus _bus;

   public SomeConsumer(ISecondBus bus)
   {
     _bus = bus;
   }

   public async Task Consume(ConsumeContext<MessageObjectList> context)
   {
       var message = context.Message;

       //Aggregate data

       var newList = new MessageObjectList
                    {
                        Message = message
                    };

      var endpoint = await _bus.GetSendEndpoint(new Uri("queue:" + QUEUE_NAME));
      await endpoint.Send(newList);
   }
}

推荐阅读