java - Spring Integration DSL ScatterGather 流块
问题描述
我有一个集成流,它执行分散收集操作,该操作命中多个返回 JSON 的 HTTP 端点。然后将结果聚合到单个 JSON 对象中。流程是这样的
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.channel("myFlow.output");
}
我使用如下声明的网关开始流程
@MessagingGateway
public interface IMyGateway {
@Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);
}
我遇到的问题是整个流程阻塞并且网关超时。我在两个服务调用IMyService::handleAggregatedJson和IMyOutherService::handleMyServiceResult中设置了断点,它们都在运行,但输出永远不会到达网关的回复通道。如果我删除了最后两个句柄操作,则流程通常通过网关返回结果。
我在流被阻塞时查看了堆栈跟踪,我可以看到运行流的线程正在等待锁定
java.lang.Thread.State:在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 在 java.util.concurrent 在 sun.misc.Unsafe.park(Unsafe.java:-1) 等待.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.acquireSharedInterruptively(AbstractQueuedSynchronizer.1 ) 在 java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 在 org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:308) 在 org.springframework.messaging.core.GenericMessagingTemplate$ TemporaryReplyChannel.receive(GenericMessagingTemplate.java:300) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:234) 在 org.springframework.messaging.core.GenericMessagingTemplate 的 org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:201) .doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org. springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:95) at org.springframework.messaging.core。AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:85) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:487) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:461) at org .springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:520) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:469) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean .java:460) 在 org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) 在 org.springframework.aop.framework.JdkDynamicAopProxy。在 com.sun.proxy.$Proxy116.startFlow 调用(JdkDynamicAopProxy.java:212)(未知来源:-1)
我怀疑如果流程花费的时间超过 X 时间,那么它将阻塞。我尝试在流和网关之间放置一个集合通道,但它似乎不起作用。
关于导致超时问题的任何想法?
附录:我一直在修改代码并删除网关上的返回类型,流上的最后一个 .channel 调用似乎停止阻塞它。
以下工作正常
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.handle(m -> {
log.info("Flow completed successfully, payload as expected:" + payload);
});
}
解决方案
我想知道你所有的
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
收集后返回一些值。request-reply 的典型错误是流程中的某个步骤停止以某个合理的值进行回复。
更新
您应该考虑从定义中删除显式replyChannel
声明,并从流程末尾删除。这样,您应该会收到对标头的回复。当您配置显式时,不能保证您不会有其他订阅此频道的订阅者会“窃取”您的回复消息。@Gateway
.channel("myFlow.output")
replyChannel
replyChannel
在参考手册中查看更多信息。
推荐阅读
- javascript - 在 js 文件中使用和创建的 div 中添加 h1 标签
- java - Quarkus Panache OneToMany 关系未保存在数据库中
- c# - 使用继承的构建器模式
- python - 将一些独立应用程序集成到 Web 环境中的最佳理论上的方法是什么?
- ruby-on-rails - 在 ruby 中添加到 id 的数据库列表中
- azure - 是否可以在 azcopy 中使用正则表达式?
- macos - 如果此系统没有任何打印机,如何在 Mac 上使用 QPageSetupDialog?
- java - 修改值后键值对的颠簸转换规范
- python - 我如何在python中创建检查列表是否为向量的函数
- python - 如何根据python中最近的时间戳左连接两个长度不同的数据帧并且不丢失任何行?