java - 使用 1 个通道进行两个不同进程的 Spring 集成
问题描述
我有一个问题,我无法将进程设置为并行进入两个通道,它要么是一个,要么是另一个。我有一个ServiceActivator
和一个Transformer
具有相同输入通道的objectOutputChannel
。去之后processChannel
它应该去两者,但它只去其中一个,并且在每个请求中都不同。我该怎么做才能让它按我想要的方式工作?
@Bean
public TcpReceivingChannelAdapter channelAdapter(AbstractServerConnectionFactory connectionFactory) {
final MsgReceivingChannelAdapter adapter = new MsgReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannel(messageChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "outputChannel")
public TcpSendingMessageHandler messageHandler(AbstractServerConnectionFactory connectionFactory){
final MsgSendingMessageHandler handler = new MsgSendingMessageHandler();
handler.setConnectionFactory(connectionFactory);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "objectOutputChannel")
public AmqpOutboundEndpoint objectOutboundEndpoint() {
return Amqp
.outboundAdapter(rabbitTemplate)
.exchangeName(objectExchange)
.get();
}
@Bean
@Transformer(inputChannel = "messageChannel", outputChannel = "loggingChannel")
public ObjectToStringTransformer loggingTransformer() {
return new ObjectToStringTransformer();
}
@ServiceActivator(inputChannel = "loggingChannel")
public void loggingService(String message) {
messageLogger.info(message);
}
@Bean
@Transformer(inputChannel = "messageChannel", outputChannel = "processChannel")
public ObjectDeserializer objectDeserializer() {
return new ObjectDeserializer();
}
@ServiceActivator(inputChannel = "processChannel", outputChannel = "objectOutputChannel")
public MyObject processService(MyObject object) {
return objectService.check(object);
}
@Bean
@Transformer(inputChannel = "objectOutputChannel", outputChannel = "outputChannel")
public ObjectSerializer objectSerializer() {
return new ObjectSerializer();
}
@Bean(name = "loggingChannel")
public MessageChannel loggingChannel() {
return new DirectChannel();
}
@Bean(name = "outputChannel")
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean(name = "messageChannel")
public MessageChannel messageChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "objectOutputChannel")
public MessageChannel objectOutputChannel() {
return new PublishSubscribeChannel());
案例 A 的调试日志- 不进入objectOutboundEndpoint()
和进入objectSerializer()
2021-02-09 11:23:40.221 DEBUG 8964 --- [pool-4-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=...]
2021-02-09 11:23:40.223 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:23:40.231 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=..]
2021-02-09 11:23:42.383 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.385 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectSerializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : bean 'objectTcpController.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.i.ip.tcp.TcpSendingMessageHandler : bean 'messageHandler'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@38499e48' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.389 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'objectTcpController.messageHandler.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
案例 B 的调试日志- 不进入objectSerializer()
和进入objectOutboundEndpoint()
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint : bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489' received message: GenericMessage [payload=...]
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_tcp_remotePort] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_connectionId] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_localInetAddress] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_address] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[history] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_hostname] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint : handler 'bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=..]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
解决方案
您所描述的不是关于objectOutputChannel
或,messageChannel
因为它们都是PublishSubscribeChannel
类实例。您所描述的是一些DirectChannel
具有循环默认调度策略的内容。但由于我们没有几个订阅者直接向您展示我们的频道,那么问题就出在其他地方。
我建议您打开消息历史记录并调查类别的调试日志,org.springframework.integration
以查看您的消息如何通过流传输。在那里你可以找出哪些渠道是有罪的。
您在代码片段中显示的内容是正确的,并且看起来与您所描述的内容不相关......
更新
这是我们在您的日志中看到的:
o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
所以,你objectOutputChannel
真的是一个DirectChannel
实例,它的 bean 在ObjectIntegrationConfiguration
. 如果该配置不是您在问题中显示的内容,那么您有一个“bean 定义覆盖”竞争条件,并且您PublishSubscribeChannel
的DirectChannel
. 请修改您的项目配置并尝试找到您声明该objectOutputChannel
bean 的其他位置。
推荐阅读
- xamarin - 如何在 Xamarin.Forms 项目中调用 Xamarin.Droid 中定义的函数?
- r - 对具有相同值的数据进行排名
- heroku - 使用通配符将域指向 Heroku 应用程序
- scala - 参数 message.send.max.retries 在 kafka 生产者中不起作用
- esb - 企业服务总线 (ESB) 是用于创建服务还是仅用于配置/集成现有服务和应用程序?
- tfs - 是否可以使用 TFS 数据按计划自动生成报告?
- video-streaming - 使用 4g 连接的 HTTP 流式传输
- python-3.x - spark 2.3.1 上的 dataframe,Describe() 函数抛出 Py4JJavaError
- r - 用缺失数据填充行
- java - 来自 ok Retrofit 响应的空图像模型