首页 > 解决方案 > 使用 1 个通道进行两个不同进程的 Spring 集成

问题描述

我有一个问题,我无法将进程设置为并行进入两个通道,它要么是一个,要么是另一个。我有一个ServiceActivator和一个Transformer具有相同输入通道的objectOutputChannel。去之后processChannel它应该去两者,但它只去其中一个,并且在每个请求中都不同。我该怎么做才能让它按我想要的方式工作?

    @Bean
    public TcpReceivingChannelAdapter channelAdapter(AbstractServerConnectionFactory connectionFactory) {
        final MsgReceivingChannelAdapter adapter = new MsgReceivingChannelAdapter();
        adapter.setConnectionFactory(connectionFactory);
        adapter.setOutputChannel(messageChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "outputChannel")
    public TcpSendingMessageHandler messageHandler(AbstractServerConnectionFactory connectionFactory){
        final MsgSendingMessageHandler handler = new MsgSendingMessageHandler();
        handler.setConnectionFactory(connectionFactory);
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "objectOutputChannel")
    public AmqpOutboundEndpoint objectOutboundEndpoint() {
        return Amqp
                .outboundAdapter(rabbitTemplate)
                .exchangeName(objectExchange)
                .get();
    }

    @Bean
    @Transformer(inputChannel = "messageChannel", outputChannel = "loggingChannel")
    public ObjectToStringTransformer loggingTransformer() {
        return new ObjectToStringTransformer();
    }

    @ServiceActivator(inputChannel = "loggingChannel")
    public void loggingService(String message) {
        messageLogger.info(message);
    }

    @Bean
    @Transformer(inputChannel = "messageChannel", outputChannel = "processChannel")
    public ObjectDeserializer objectDeserializer() {
        return new ObjectDeserializer();
    }

    @ServiceActivator(inputChannel = "processChannel", outputChannel = "objectOutputChannel")
    public MyObject processService(MyObject object) {
        return objectService.check(object);
    }

    @Bean
    @Transformer(inputChannel = "objectOutputChannel", outputChannel = "outputChannel")
    public ObjectSerializer objectSerializer() {
        return new ObjectSerializer();
    }

    @Bean(name = "loggingChannel")
    public MessageChannel loggingChannel() {
        return new DirectChannel();
    }

    @Bean(name = "outputChannel")
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean(name = "messageChannel")
    public MessageChannel messageChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean(name = "objectOutputChannel")
    public MessageChannel objectOutputChannel() {
        return new PublishSubscribeChannel());

案例 A 的调试日志- 不进入objectOutboundEndpoint()和进入objectSerializer()

2021-02-09 11:23:40.221 DEBUG 8964 --- [pool-4-thread-2] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:23:40.223 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:23:40.231 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler   : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=..]
2021-02-09 11:23:42.383 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.385 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectSerializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : bean 'objectTcpController.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : bean 'messageHandler'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@38499e48' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.389 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'objectTcpController.messageHandler.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]

案例 B 的调试日志- 不进入objectSerializer()和进入objectOutboundEndpoint()

2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint    : bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489' received message: GenericMessage [payload=...]
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_tcp_remotePort] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_connectionId] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_localInetAddress] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_address] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[history] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_hostname] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint    : handler 'bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=..]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]

标签: javaspringspring-integration

解决方案


您所描述的不是关于objectOutputChannel或,messageChannel因为它们都是PublishSubscribeChannel类实例。您所描述的是一些DirectChannel具有循环默认调度策略的内容。但由于我们没有几个订阅者直接向您展示我们的频道,那么问题就出在其他地方。

我建议您打开消息历史记录并调查类别的调试日志,org.springframework.integration以查看您的消息如何通过流传输。在那里你可以找出哪些渠道是有罪的。

您在代码片段中显示的内容是正确的,并且看起来与您所描述的内容不相关......

更新

这是我们在您的日志中看到的:

o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]

所以,你objectOutputChannel真的是一个DirectChannel实例,它的 bean 在ObjectIntegrationConfiguration. 如果该配置不是您在问题中显示的内容,那么您有一个“bean 定义覆盖”竞争条件,并且您PublishSubscribeChannelDirectChannel. 请修改您的项目配置并尝试找到您声明该objectOutputChannelbean 的其他位置。


推荐阅读