rabbitmq - 使用spring cloud stream的队列监听器行为
问题描述
我正在尝试使用 spring 云流供应商和消费者来实现上述场景。
- 这个应用程序是一个包含生产者和消费者的单一 Spring Boot 应用程序。
- 有一个生产者和(可以是)多个消费者。所有消费者都应该作为客户端来排队(即单个消息应该只由单个消费者接收),而其他消费者接收不同的消息。
下面是java类
@Component
public class MultipleFunctionsApplication {
@Bean
public Consumer<String> sink1() {
return message -> {
System.out.println(new Date() + "----------->>> sink1 - Received message " + message);
};
}
@Bean
public Consumer<String> sink2() {
return message -> {
System.out.println(new Date() + "----------->>> sink2 - Received message " + message);
};
}
}
我正在尝试使用消费者组功能来实现这一点,如下所示。
spring:
cloud:
stream:
bindings:
requester1:
destination: rss-exchange
group: requester
requester2:
destination: rss-exchange
group: requester
function:
bindings:
sink1-in-0: requester1
sink2-in-0: requester2
definition: sink1;sink2
application:
name: rss
当我启动应用程序时,出现以下错误。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'rss-exchange.requester.errors.recoverer' defined in null: Cannot register bean definition [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] for bean 'rss-exchange.requester.errors.recoverer': There is already [Generic bean: class [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer]; scope=singleton; abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound.
at org.springframework.beans.factory.support.DefaultListableBeanFactory.registerBeanDefinition(DefaultListableBeanFactory.java:995) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.GenericApplicationContext.registerBeanDefinition(GenericApplicationContext.java:330) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.beans.factory.support.BeanDefinitionReaderUtils.registerBeanDefinition(BeanDefinitionReaderUtils.java:164) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.doRegisterBean(AnnotatedBeanDefinitionReader.java:285) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotatedBeanDefinitionReader.registerBean(AnnotatedBeanDefinitionReader.java:233) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.annotation.AnnotationConfigApplicationContext.registerBean(AnnotationConfigApplicationContext.java:198) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:687) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.registerErrorInfrastructure(AbstractMessageChannelBinder.java:639) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:525) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder.createConsumerEndpoint(RabbitMessageChannelBinder.java:136) ~[spring-cloud-stream-binder-rabbit-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:408) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
... 24 common frames omitted
从日志中可以清楚地看出它正在尝试再次创建“rss-exchange.requester.errors.recoverer”。在这种情况下,只有 sink1 以以下消息开始。
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:01:22 IST 2021----------->>> sink1 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}
当我添加“allow-bean-definition-overriding:true”时,一切都按预期正常工作,如下日志所示。
Fri Aug 27 15:03:57 IST 2021----------->>> sink1 - Received message {"channel":"chn19388","rssurl":"url random"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnl02394","rssurl":"http://localhost:8080/test"}
Fri Aug 27 15:03:57 IST 2021----------->>> sink2 - Received message {"channel":"chnldkdjaif","rssurl":"url random"}
我不确定这样做是否正确,因为即使我正在尝试的用例正在使用覆盖属性,我也会收到 bean 的错误。
注意 - 自从我开始探索流云流以来只有几天,所以如果我问了一些愚蠢的问题,请认为我很天真。
解决方案
因此,该问题已通过您的配置得到修复和测试、合并,并且在当前快照 (3.2.0-SNAPSHOT) 中可用。
推荐阅读
- excel - 关闭第二个工作簿时 Excel 丢失数据
- json - JSON JsonPath -> 获取包含字符串的数组值
- java - 如何使用结果/上传的 jar 找出哪个用户运行了“mvn clean deploy”命令
- rollup - Tree Shaking 问题 - 将 preserveModules 设置为 true 的汇总
- javascript - 为什么图像没有在 Phaser 中加载?
- ssl - 将letsencrypt ssl添加到詹金斯服务器
- c++ - 关于c ++虚函数覆盖的困惑,派生类调用正确的覆盖函数但不输出正确的参数值
- regression - 计算最小二乘法时矩阵的维数问题
- c# - 如何在 AutoCAD c# 中访问和修改文本字段
- ruby-on-rails - 如何使用带有 remote:true 的 simple_form 显示验证失败时生成的错误