spring - 使用wireTap时replyChannel超时
问题描述
我们使用wireTap 在流程的不同部分获取时间戳。当引入最新流时,它开始导致replyChannel 超时。根据我从文档中了解到的情况,wireTap 确实拦截了消息并将其发送到辅助通道,同时不影响主流 - 所以它看起来是用来拍摄所述时间戳的快照的完美工具。我们是否为工作使用了错误的组件,或者配置有问题?如果是这样,您建议如何注册此类信息?
例外:
o.s.integration.core.MessagingTemplate : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@21845b0d' within timeout: 1000
编码:
@Bean
public MarshallingWebServiceInboundGateway inboundGateway(Jaxb2Marshaller jaxb2Marshaller,
DefaultSoapHeaderMapper defaultSoapHeaderMapper) {
final MarshallingWebServiceInboundGateway inboundGateway =
new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
inboundGateway.setRequestChannelName(INPUT_CHANNEL_NAME);
inboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
return inboundGateway;
}
@Bean
public IntegrationFlow querySynchronous() {
return IntegrationFlows.from(INPUT_CHANNEL_NAME)
.enrichHeaders(...)
.wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
.handle(outboundGateway)
.wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_RESPONSE_RECEIVED_TIMESTAMP_NAME))
//.transform( m -> m) // for tests - REMOVE
.get();
}
和时间戳流:
public IntegrationFlow registerTimestampFlow(String asyncRequestReceivedTimestampName) {
return channel -> channel.handle(
m -> MetadataStoreConfig.registerFlowTimestamp(m, metadataStore, asyncRequestReceivedTimestampName));
}
这里值得注意的是,如果我取消注释无操作变压器,一切都会突然正常工作,但听起来不对,我想避免这种变通方法。
另一件事是,另一个非常相似的流程可以正常工作,没有任何变通方法。显着的区别在于它使用 kafka 适配器将消息放入 kafka,而不是使用出站网关调用一些 Web 服务。它仍然会生成对句柄的响应(使用 generateResponseFlow()),因此它的行为方式应该相同。这是流程,效果很好:
@Bean
public MarshallingWebServiceInboundGateway workingInboundGateway(Jaxb2Marshaller jaxb2Marshaller,
DefaultSoapHeaderMapper defaultSoapHeaderMapper, @Qualifier("errorChannel") MessageChannel errorChannel) {
MarshallingWebServiceInboundGateway aeoNotificationInboundGateway =
new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
aeoNotificationInboundGateway.setRequestChannelName(WORKING_INPUT_CHANNEL_NAME);
aeoNotificationInboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
aeoNotificationInboundGateway.setErrorChannel(errorChannel);
return aeoNotificationInboundGateway;
}
@Bean
public IntegrationFlow workingEnqueue() {
return IntegrationFlows.from(WORKING_INPUT_CHANNEL_NAME)
.enrichHeaders(...)
.wireTap(performanceTimestampRegistrator
.registerTimestampFlow(ASYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
.filter(...)
.filter(...)
.publishSubscribeChannel(channel -> channel
.subscribe(sendToKafkaFlow())
.subscribe(generateResponseFlow()))
.wireTap(performanceTimestampRegistrator
.registerTimestampFlow(ASYNC_REQUEST_ENQUEUED_TIMESTAMP_NAME))
.get();
}
那么,wireTap 是最后一个组件就没有问题了,replyChannel 及时正确收到了响应,没有任何变通办法。
解决方案
该行为是预期的。当流尾使用wireTap()
(或log()
)时,默认不回复。由于我们无法假设您尝试将什么逻辑包含在流定义中,因此我们会尽力使用默认行为 - 流变成单向的、发送后忘记的流:有些人真的要求将其设为非后可回复log()
...
要让它仍然回复调用者,您需要bridge()
在流的末尾添加一个。在文档中查看更多信息:https ://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log
它适用于您非常复杂的场景,因为您的订阅者之一publishSubscribeChannel
就是generateResponseFlow()
带有回复的订阅者。老实说,您需要小心请求-回复行为和这样的publishSubscribeChannel
配置。replyChannel
只能接受一个回复,如果您希望收到多个订阅者的回复,您会惊讶于这种行为是多么奇怪。
在wireTap
这个你的配置不是订阅者,它是一个注入到那个的拦截器publishSubscribeChannel
。因此,您对相似性的假设具有误导性。窃听后流程结束,但由于其中一个订阅者正在回复,您会得到预期的行为。让我们看一下publishSubscribeChannel
并联电路,其中所有连接都独立于其他连接获得电力。他们执行他们的工作而不影响所有其他人。无论如何,这是不同的故事。
总结:要在 之后从流中回复wireTap()
,您需要指定一个bridge()
回复消息将被正确路由到replyChannel
来自调用者的。
推荐阅读
- lighthouse - PagespeedInsights 和 Lighthouse 的区别
- python - 如何将多个 MDSwiperItem 添加到 MDSwiper:]?
- android - 如何使用 AR Core 在同一个活动中渲染多个 AR 模型?
- ios - 从剪贴板粘贴不适用于 iOS 模拟器上的 React Native
- javascript - 我可以用普通函数调用异步函数吗?Javascript
- javascript - 如何以开玩笑的方式从 jquery 选择器中模拟 div 长度属性
- angular - 具有映射值的 Ng 自动完成过滤器列表功能
- django - “for”语句应使用“for x in y”格式:对于 allPost1 和 allPost2 以及 allPost3 和 allPost4 中的 allPosts
- angularjs - 在 Angular 项目中使用 fluent-ffmpeg
- c# - 在 C# 中的 DataGridView 中提交数据