首页 > 解决方案 > 使用spring cloud stream的队列监听器行为

问题描述

在此处输入图像描述

我正在尝试使用 spring 云流供应商和消费者来实现上述场景。

  1. 这个应用程序是一个包含生产者和消费者的单一 Spring Boot 应用程序。
  2. 有一个生产者和(可以是)多个消费者。所有消费者都应该作为客户端来排队(即单个消息应该只由单个消费者接收),而其他消费者接收不同的消息。

下面是java类

 @Component
public class MultipleFunctionsApplication {
    
    @Bean
    public Consumer<String> sink1() {
        return message -> {         
            System.out.println(new Date() + "----------->>> sink1 - Received message " + message);
        };
    }

    @Bean
    public Consumer<String> sink2() {
        return message -> {         
            System.out.println(new Date() + "----------->>> sink2 - Received message " + message);
        };
    }

}

我正在尝试使用消费者组功能来实现这一点,如下所示。

spring:
  cloud:
    stream:
      bindings:
        requester1:
          destination: rss-exchange
          group: requester
        requester2:
          destination: rss-exchange
          group: requester
      function:
        bindings:
          sink1-in-0: requester1
          sink2-in-0: requester2          
        definition: sink1;sink2
  application:
    name: rss

当我启动应用程序时,出现以下错误。

Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'rss-exchange.requester.errors.recoverer' defined in null: Cannot register bean definition [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'rss-exchange.requester.errors.recoverer': There is already [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:995) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:330) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.beans.factory.support.BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionReaderUtils.java:164) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.doRegisterBean(AnnotatedBeanDefinitionReader.java:285) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.registerBean(AnnotatedBeanDefinitionReader.java:233) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotationConfigApplicationContext.registerBean(AnnotationConfigApplicationContext.java:198) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:687) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:525) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:136) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
... 24 common frames omitted

从日志中可以清楚地看出它正在尝试再次创建“rss-exchange.requester.errors.recoverer”。在这种情况下,只有 sink1 以以下消息开始。

Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}

当我添加“allow-bean-definition-overriding:true”时,一切都按预期正常工作,如下日志所示。

Fri Aug 27 15:03:57 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}

我不确定这样做是否正确,因为即使我正在尝试的用例正在使用覆盖属性,我也会收到 bean 的错误。

注意 - 自从我开始探索流云流以来只有几天,所以如果我问了一些愚蠢的问题,请认为我很天真。

标签: rabbitmqspring-cloud-stream

解决方案


因此,该问题已通过您的配置得到修复和测试、合并,并且在当前快照 (3.2.0-SNAPSHOT) 中可用。


推荐阅读