spring-boot - rabbitMQ 创建两个独立的消费者,他们从同一服务中的两个不同集群消费
问题描述
我正在尝试创建两个单独的消费者,他们从同一服务中的两个不同集群中消费。
我已尝试创建新连接,但仍然仅在其中一个集群中创建了两个交换。
我在这里遗漏了什么?我正在使用 spring-boot:2.2.5.RELEASE 和 spring-rabbit:2.2.5.RELEASE
我的配置是这样的。
@Configuration
@AllArgsConstructor
public class RabbitMQConfiguration {
private final Connection_A_MQProperties connectionAMQProperties;
private final Connection_B_MQProperties connectionBMQProperties;
@Primary
@Bean
public ConnectionFactory connectionFactoryA(Connection_A_MQProperties connectionAMQProperties) {
return createConnection(connectionAMQProperties.getBaseProperties());
}
@Bean
public ConnectionFactory connectionFactoryB(Connection_B_MQProperties connectionBMQProperties) {
return createConnection(connectionBMQProperties.getBaseProperties());
}
private ConnectionFactory createConnection(BaseProperties baseProperties){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(baseProperties.getHost());
factory.setPort(baseProperties.getPort());
factory.setUsername(baseProperties.getUsername());
factory.setPassword(baseProperties.getPassword());
factory.setConnectionTimeout(baseProperties.getConnectionTimeout());
factory.setRequestedHeartBeat(baseProperties.getRequestedHeartBeat());
factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
factory.setConnectionCacheSize(2);
return factory;
}
@Primary
@Bean("connection_A_RabbitAdmin")
public RabbitAdmin connection_A_RabbitAdmin(@Qualifier("connection_A_RabbitTemplate") RabbitTemplate connection_A_RabbitTemplate) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_A_RabbitTemplate);
rabbitAdmin.setExplicitDeclarationsOnly(true);
return rabbitAdmin;
}
@Bean("connection_B_RabbitAdmin")
public RabbitAdmin connection_B_RabbitAdmin(@Qualifier("connection_B_RabbitTemplate") RabbitTemplate connection_B_RabbitTemplate) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_B_RabbitTemplate);
rabbitAdmin.setExplicitDeclarationsOnly(true);
return rabbitAdmin;
}
/**
* each declarable has been configured with a AbstractDeclarable.setAdminsThatShouldDeclare() method which contains the particular admin bean
* for which declarable should be processed. i have checked this is getting filtered out correctly for each of the rabbit admins.
**/
@Bean("connection_A_Declarable")
public Declarables connection_A_Declarable(@Qualifier("connection_A_RabbitAdmin") RabbitAdmin connection_A_RabbitAdmin) {
return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_A_RabbitAdmin))
.baseProperties(connectionAMQProperties)
.queueNames(Collections.singletonList(connectionAMQProperties.getQueue()))
.build();
}
@Bean("connection_B_Declarable")
public Declarables connection_B_Declarable(@Qualifier("connection_B_RabbitAdmin") RabbitAdmin connection_B_RabbitAdmin){
return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_B_RabbitAdmin))
.baseProperties(connectionBMQProperties)
.queueNames(Collections.singletonList(connectionBMQProperties.getQueue()))
.build();
}
@Primary
@Bean("connection_A_RabbitTemplate")
public RabbitTemplate connection_A_RabbitTemplate(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
return rabbitTemplate(connectionFactoryA,connectionAMQProperties);
}
@Bean("connection_B_RabbitTemplate")
public RabbitTemplate connection_B_RabbitTemplate(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB) {
return rabbitTemplate(connectionFactoryB,connectionBMQProperties);
}
private RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,BaseProperties baseProperties) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setExchange(baseProperties.getExchange());
template.setRoutingKey(baseProperties.getQueueName());
template.setDefaultReceiveQueue(baseProperties.getQueueName());
return template;
}
@Primary
@Bean(name = "connection_A_ContainerFactory")
public SimpleRabbitListenerContainerFactory connection_A_ContainerFactory(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
return simpleRabbitListenerContainerFactory(connectionFactoryA,connectionAMQProperties);
}
@Bean(name = "connection_B_ContainerFactory")
public SimpleRabbitListenerContainerFactory connection_B_ContainerFactory(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB){
return simpleRabbitListenerContainerFactory(connectionFactoryB,connectionBMQProperties);
}
private SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory, BaseProperties baseProperties) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(baseProperties.getConcurrentConsumer());
factory.setMaxConcurrentConsumers(baseProperties.getMaxConcurrentConsumer());
return factory;
}
}
并且侦听器被配置为(类似于第二个侦听器)
@RabbitListener(
queues = "${connection_a.queue}",
containerFactory = "connectionFactoryA"
)
一些日志(编辑以隐藏一些信息)(在这里你可以看到交换和队列都被各自的管理员正确声明)。
o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: **-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory : Created new connection: **ConnectionFactory#5cba890e:0/SimpleConnection@513bec8c [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64992]
o.s.retry.support.RetryTemplate : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://client-user@172.27.**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://client-user@172.27.**.**:5672/,1), conn: Proxy@1d6014a7 Shared Rabbit Connection: SimpleConnection@513bec8c [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64992]
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.connectionA.exchange'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.connectionA.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionA.**'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionA.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished
o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: message-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory : Created new connection: **ConnectionFactory#dbca149:0/SimpleConnection@59cb10e0 [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64994]
o.s.retry.support.RetryTemplate : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin : Initializing declarations
o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://client-user@172.27.**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://client-user@172.27.**.**:5672/,1), conn: Proxy@3d763ae5 Shared Rabbit Connection: SimpleConnection@59cb10e0 [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64994]
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.**.connectionB.exchange'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Exchange 'com.**.**.**.**.connectionB.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionB.**'
o.s.amqp.rabbit.core.RabbitAdmin : declaring Queue 'com.**.**.**.**.queue.connectionB.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin : Declarations finished
解决方案
这对我来说按预期工作:
@SpringBootApplication
public class So62382630Application {
public static void main(String[] args) {
SpringApplication.run(So62382630Application.class, args);
}
@Bean
@Primary
ConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}
@Bean
ConnectionFactory cf2() {
return new CachingConnectionFactory("10.0.0.21");
}
@Bean
RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
Queue q1() {
Queue queue = new Queue("q1");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
Queue q2() {
Queue queue = new Queue("q2");
queue.setAdminsThatShouldDeclare(admin2());
return queue;
}
@Bean
public ApplicationRunner runner(RabbitAdmin admin1, RabbitAdmin admin2) {
return args -> {
System.out.println(admin1.getQueueInfo("q1"));
System.out.println(admin2.getQueueInfo("q2"));
};
}
}
我看到在各个节点上声明的队列。
推荐阅读
- sql - 在 SQL 查询连接中引用 Excel 范围
- angular - Ionic 5:DateTime 在 SegmentButton 中不起作用
- python - 如何使用 for 循环按列计算 numpy 矩阵的平均值?
- excel - 更改按钮颜色
- selenium - Selenium WebDriver ChromeDriver 2.6GB 的“修改内存”并不断增加
- python - TensorFlow Object Detection API - 并非所有类都被检测到
- flutter - Flutter:如何在 appbar 小部件中包含切换按钮?
- python - 在 jenkinsfile 中运行诗歌
- php - 如何验证 laravel 的另一个连接中是否存在雄辩的父模型?
- image - 使用 terraform 错误从自定义图像创建 azure vm