spring-cloud-stream - 错误:无法指定多个目的地
问题描述
我正在尝试配置将消息发送到两个输出流,如下所示。
@StreamListener(SampleBinding.INPUT1)
@SendTo({SampleBinding.OUTPUT1, SampleBinding.OUTPUT2})
public String handleM(String sampleMessage){
log.info("Received message="+sampleMessage.toString());
sampleMessage=sampleMessage.toUpperCase();
return sampleMessage;
}
我按照这个例子
它给出了以下错误。
java.lang.IllegalArgumentException:无法在 org.springframework.cloud.stream.binding.StreamListenerMethodUtils.getOutboundBindingTargetName(StreamListenerMethodUtils.java:146) 的 org.springframework.util.Assert.isTrue(Assert.java:118) 指定多个目的地org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$DefaultStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(StreamListenerAnnotationBeanPostProcessor.java:349) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) at org.springframework.cloud. stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) 在 java.lang.Iterable.forEach(Iterable.java:75) 在 org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) 在 org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor .afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) at org.springframework.boot.web.servle 上的 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) 上的 lang.Iterable.forEach(Iterable.java:75) java:105) 在 org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) 在 org.springframework.context 的 org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) .support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 在 org.springframework.boot.web.servleorg.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) 上的 lang.Iterable.forEach(Iterable.java:75) java:105) 在 org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) 在 org.springframework.context 的 org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) .support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 在 org.springframework.boot.web.servlejava:75) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) .beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext .java:546) 在 org.springframework.boot.web.servlejava:75) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) .beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext .java:546) 在 org.springframework.boot.web.servleStreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java) 863) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 在 org.springframework.boot.web.servle 的 org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java) 863) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 在 org.springframework.boot.web.servle 的 org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)285) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) 在 org.springframework.context 的 org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) .support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 在 org.springframework.boot.web.servle285) 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) 在 org.springframework.context 的 org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) .support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) 在 org.springframework.boot.web.servleDefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) at org .springframework.boot.web.servleDefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) at org .springframework.boot.web.servle
解决方案
StreamListener
使用基于常规的活页夹时,无法像您从方法中描述的那样发送到多个目的地MessageChannel
。可以使用您在上面提供的链接中提到的 Kafka Streams binder 的分支功能发送到多个主题。如果要发送到应用程序中的多个目的地,一种选择是使用 Spring Cloud Stream 的动态目的地功能。这是动态目的地如何工作的示例。
推荐阅读
- c++ - 文件列表小部件
- javascript - 等待用户点击独立的 2fa 链接?
- angular - 即使编译成功,也无法在浏览器中获得输出
- sql - 在 oracle 链接服务器上打开查询返回错误
- javascript - 奇怪的行为 getBoundingClientRect() 和 F5/reload,这是怎么回事?
- javascript - 使用 jQuery 在字段输入时增加计数器
- c# - 取消订阅股票更新时,带有 Binance.Net 的 Dot Net Core C# Winforms 应用程序挂起
- jsx - 如何在此 Photoshop 脚本中获得 var theArray1 和 var theArray2 的对话/提示?
- google-cloud-data-fusion - Cloud Data Fusion - 监控管道运行
- python - 使用 Python 在 Web 服务和 r 模型的 Windows 中进行 docker 本地部署的问题