首页 > 解决方案 > Spring Integration DSL ScatterGather 流块

问题描述

我有一个集成流,它执行分散收集操作,该操作命中多个返回 JSON 的 HTTP 端点。然后将结果聚合到单个 JSON 对象中。流程是这样的

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .channel("myFlow.output");
}

我使用如下声明的网关开始流程

@MessagingGateway
public interface IMyGateway {

    @Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
    MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);

}

我遇到的问题是整个流程阻塞并且网关超时。我在两个服务调用IMyService::handleAggregatedJsonIMyOutherService::handleMyServiceResult中设置了断点,它们都在运行,但输出永远不会到达网关的回复通道。如果我删除了最后两个句柄操作,则流程通常通过网关返回结果。

我在流被阻塞时查看了堆栈跟踪,我可以看到运行流的线程正在等待锁定

java.lang.Thread.State:在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 在 java.util.concurrent 在 sun.misc.Unsafe.park(Unsafe.java:-1) 等待.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.acquireSharedInterruptively(AbstractQueuedSynchronizer.1 ) 在 java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 在 org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) 在 org.springframework.messaging.core.GenericMessagingTemplate$ TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) 在 org.springframework.messaging.core.GenericMessagingTemplate 的 org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) .doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org. springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) at org.springframework.messaging.core。AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) at org .springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean .java:460) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) 在 org.springframework.aop.framework.JdkDynamicAopProxy。在 com.sun.proxy.$Proxy116.startFlow 调用(JdkDynamicAopProxy.java:212)(未知来源:-1)

我怀疑如果流程花费的时间超过 X 时间,那么它将阻塞。我尝试在流和网关之间放置一个集合通道,但它似乎不起作用。

关于导致超时问题的任何想法?

附录:我一直在修改代码并删除网关上的返回类型,流上的最后一个 .channel 调用似乎停止阻塞它。

以下工作正常

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .handle(m -> {
                log.info("Flow completed successfully, payload as expected:" + payload);
            });
}

标签: javaspringspring-bootspring-integrationspring-integration-dsl

解决方案


我想知道你所有的

.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")

收集后返回一些值。request-reply 的典型错误是流程中的某个步骤停止以某个合理的值进行回复。

更新

您应该考虑从定义中删除显式replyChannel声明,并从流程末尾删除。这样,您应该会收到对标头的回复。当您配置显式时,不能保证您不会有其他订阅此频道的订阅者会“窃取”您的回复消息。@Gateway.channel("myFlow.output")replyChannelreplyChannel

在参考手册中查看更多信息。


推荐阅读