.net-core - 使用来自一个 rabbitmq 主机的消息并使用公共传输和 .net 核心发布到不同的 rabbitmq 主机
问题描述
使用不同的 rabbitmq 主机发布/使用消息时,我似乎遇到了问题。最初,pub / sub 发生在一台rabbitmq 主机上。决定创建另一台主机(位于不同的服务器上),从那以后我遇到了一些问题。我已经阅读了多总线文档(https://masstransit-project.com/usage/containers/multibus.html)并尝试实现它,以便我可以拥有一条用于生成消息的总线和一条用于消费消息的总线。请在下面找到代码:
--- 启动.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbit1");
x.AddConsumer<SomeConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransit<ISecondBus>(x =>
{
var section = configuration.GetSection("Rabbit2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.PrefetchCount = 4;
e.UseMessageRetry(r => r.Interval(10, 100));
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- ISecondBus.cs ---
public interface ISecondBus : IBus
{
}
--- 消费者.cs ---
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly IBus _bus;
public SomeConsumer(IBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
await endpoint.Send(newList);
}
}
上面的代码工作正常,成功地消费了来自 rabbit1 的消息。但是,当向 Rabbit2 发送消息时,我可以看到正在创建队列,但没有数据被持久化到队列中。
请参阅下面的第二个消费者:
--- 启动.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbitmq2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- SomeOtherConsumer.cs ---
public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
//Persist data to db
}
}
这里的问题是 rabbit2 的队列永远不会被填充(但它确实被创建)所以没有消息从第二个消费者消费。
任何援助将不胜感激。
解决方案
如果要从第一条总线发送到第二条总线,则需要在第一条总线的消费者中使用第二条总线接口。
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly ISecondBus _bus;
public SomeConsumer(ISecondBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri("queue:" + QUEUE_NAME));
await endpoint.Send(newList);
}
}
推荐阅读
- google-drive-api - COLAB 中的 GAN 输出图像
- java - JTree 不过滤扩展
- c# - 发送鼠标左键单击 c# 控制台应用程序
- loops - 不能在循环外打印相同的值
- javascript - 从 React 中的 JSON 推送时,数组仅存储最后一个变量
- numpy - set directory does not work for TemporaryFile
- r - Getting data frame from loadings of factor analysis (fa function in psych)
- json - Query result in JSON format (key value pair) on using @Query annotation in Spring Boot, Hibernate
- php - Php - 显示来自多个表的页面内容?
- ios - 基于关系查找核心数据父对象的谓词