首页 > 解决方案 > rabbitMQ 创建两个独立的消费者,他们从同一服务中的两个不同集群消费

问题描述

我正在尝试创建两个单独的消费者,他们从同一服务中的两个不同集群中消费。

我已尝试创建新连接,但仍然仅在其中一个集群中创建了两个交换。

我在这里遗漏了什么?我正在使用 spring-boot:2.2.5.RELEASE 和 spring-rabbit:2.2.5.RELEASE

我的配置是这样的。

@Configuration
@AllArgsConstructor
public class RabbitMQConfiguration {

  private final Connection_A_MQProperties connectionAMQProperties;
  private final Connection_B_MQProperties connectionBMQProperties;

  @Primary
  @Bean
  public ConnectionFactory connectionFactoryA(Connection_A_MQProperties connectionAMQProperties) {
    return createConnection(connectionAMQProperties.getBaseProperties());
  }

  @Bean
  public ConnectionFactory connectionFactoryB(Connection_B_MQProperties connectionBMQProperties) {
    return createConnection(connectionBMQProperties.getBaseProperties());
  }

  private ConnectionFactory createConnection(BaseProperties baseProperties){
     CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost(baseProperties.getHost());
    factory.setPort(baseProperties.getPort());
    factory.setUsername(baseProperties.getUsername());
    factory.setPassword(baseProperties.getPassword());
    factory.setConnectionTimeout(baseProperties.getConnectionTimeout());
    factory.setRequestedHeartBeat(baseProperties.getRequestedHeartBeat());
    factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
    factory.setConnectionCacheSize(2);

    return factory;
  }

  @Primary
  @Bean("connection_A_RabbitAdmin")
  public RabbitAdmin connection_A_RabbitAdmin(@Qualifier("connection_A_RabbitTemplate") RabbitTemplate connection_A_RabbitTemplate) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_A_RabbitTemplate);

    rabbitAdmin.setExplicitDeclarationsOnly(true);
    return rabbitAdmin;
  }

  @Bean("connection_B_RabbitAdmin")
  public RabbitAdmin connection_B_RabbitAdmin(@Qualifier("connection_B_RabbitTemplate") RabbitTemplate connection_B_RabbitTemplate) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connection_B_RabbitTemplate);
    rabbitAdmin.setExplicitDeclarationsOnly(true);
    return rabbitAdmin;
  }

  /**
  * each declarable has been configured with a AbstractDeclarable.setAdminsThatShouldDeclare() method which contains the particular admin bean 
  * for which declarable should be processed. i have checked this is getting filtered out correctly for each of the rabbit admins.
  **/

  @Bean("connection_A_Declarable")
  public Declarables connection_A_Declarable(@Qualifier("connection_A_RabbitAdmin") RabbitAdmin connection_A_RabbitAdmin) {
    return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_A_RabbitAdmin))
      .baseProperties(connectionAMQProperties)
      .queueNames(Collections.singletonList(connectionAMQProperties.getQueue()))
      .build();
  }

  @Bean("connection_B_Declarable")
  public Declarables connection_B_Declarable(@Qualifier("connection_B_RabbitAdmin") RabbitAdmin connection_B_RabbitAdmin){
    return DeclarableMQFactory.builder().rabbitAdminList(Collections.singletonList(connection_B_RabbitAdmin))
      .baseProperties(connectionBMQProperties)
      .queueNames(Collections.singletonList(connectionBMQProperties.getQueue()))
      .build();
  }


  @Primary
  @Bean("connection_A_RabbitTemplate")
  public RabbitTemplate connection_A_RabbitTemplate(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
        return rabbitTemplate(connectionFactoryA,connectionAMQProperties);
  }

  @Bean("connection_B_RabbitTemplate")
  public RabbitTemplate connection_B_RabbitTemplate(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB) {
    return rabbitTemplate(connectionFactoryB,connectionBMQProperties);
  }

  private RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,BaseProperties baseProperties) {
    RabbitTemplate template = new RabbitTemplate();
    template.setConnectionFactory(connectionFactory);
    template.setExchange(baseProperties.getExchange());
    template.setRoutingKey(baseProperties.getQueueName());
    template.setDefaultReceiveQueue(baseProperties.getQueueName());
    return template;
  }

  @Primary
  @Bean(name = "connection_A_ContainerFactory")
  public SimpleRabbitListenerContainerFactory connection_A_ContainerFactory(@Qualifier("connectionFactoryA")ConnectionFactory connectionFactoryA) {
    return simpleRabbitListenerContainerFactory(connectionFactoryA,connectionAMQProperties);
  }

  @Bean(name = "connection_B_ContainerFactory")
  public SimpleRabbitListenerContainerFactory connection_B_ContainerFactory(@Qualifier("connectionFactoryB")ConnectionFactory connectionFactoryB){
    return simpleRabbitListenerContainerFactory(connectionFactoryB,connectionBMQProperties);
  }

  private SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory, BaseProperties baseProperties) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    factory.setConcurrentConsumers(baseProperties.getConcurrentConsumer());
    factory.setMaxConcurrentConsumers(baseProperties.getMaxConcurrentConsumer());
    return factory;
  }
}

并且侦听器被配置为(类似于第二个侦听器)

@RabbitListener(
    queues = "${connection_a.queue}",
    containerFactory = "connectionFactoryA"
  )

一些日志(编辑以隐藏一些信息)(在这里你可以看到交换和队列都被各自的管理员正确声明)。

o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: **-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory       : Created new connection: **ConnectionFactory#5cba890e:0/SimpleConnection@513bec8c [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64992]
o.s.retry.support.RetryTemplate          : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://client-user@172.27.**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://client-user@172.27.**.**:5672/,1), conn: Proxy@1d6014a7 Shared Rabbit Connection: SimpleConnection@513bec8c [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64992]
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.connectionA.exchange'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.connectionA.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionA.**'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionA.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished



o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: message-**-rabbitmq.qa2-sg.cld:5672
o.s.a.r.c.CachingConnectionFactory       : Created new connection: **ConnectionFactory#dbca149:0/SimpleConnection@59cb10e0 [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64994]
o.s.retry.support.RetryTemplate          : Retry: count=0
o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://client-user@172.27.**.**:5672/,1)
o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$1063/0x00000008007e3c40 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://client-user@172.27.**.**:5672/,1), conn: Proxy@3d763ae5 Shared Rabbit Connection: SimpleConnection@59cb10e0 [delegate=amqp://client-user@172.27.**.**:5672/, localPort= 64994]
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.**.connectionB.exchange'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'com.**.**.**.**.connectionB.exchange.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionB.**'
o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'com.**.**.**.**.queue.connectionB.**.DLX'
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.** (QUEUE)] to exchange [com.**.**.**.**.**.exchange] with routing key [com.**.**.**.**.queue.**.**]
o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [com.**.**.**.**.queue.**.**.DLX (QUEUE)] to exchange [com.**.**.**.**.**.exchange.DLX] with routing key [com.**.**.**.**.queue.**.**.DLX]
o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished

标签: spring-bootrabbitmqspring-amqpspring-rabbit

解决方案


这对我来说按预期工作:

@SpringBootApplication
public class So62382630Application {

    public static void main(String[] args) {
        SpringApplication.run(So62382630Application.class, args);
    }

    @Bean
    @Primary
    ConnectionFactory cf1() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    ConnectionFactory cf2() {
        return new CachingConnectionFactory("10.0.0.21");
    }

    @Bean
    RabbitAdmin admin1() {
        return new RabbitAdmin(cf1());
    }

    @Bean
    RabbitAdmin admin2() {
        return new RabbitAdmin(cf2());
    }

    @Bean
    Queue q1() {
        Queue queue = new Queue("q1");
        queue.setAdminsThatShouldDeclare(admin1());
        return queue;
    }

    @Bean
    Queue q2() {
        Queue queue = new Queue("q2");
        queue.setAdminsThatShouldDeclare(admin2());
        return queue;
    }

    @Bean
    public ApplicationRunner runner(RabbitAdmin admin1, RabbitAdmin admin2) {
        return args -> {
            System.out.println(admin1.getQueueInfo("q1"));
            System.out.println(admin2.getQueueInfo("q2"));
        };
    }

}

我看到在各个节点上声明的队列。


推荐阅读