首页 > 解决方案 > 使用wireTap时replyChannel超时

问题描述

我们使用wireTap 在流程的不同部分获取时间戳。当引入最新流时,它开始导致replyChannel 超时。根据我从文档中了解到的情况,wireTap 确实拦截了消息并将其发送到辅助通道,同时不影响主流 - 所以它看起来是用来拍摄所述时间戳的快照的完美工具。我们是否为工作使用了错误的组件,或者配置有问题?如果是这样,您建议如何注册此类信息?

例外:

o.s.integration.core.MessagingTemplate   : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@21845b0d' within timeout: 1000

编码:

@Bean
public MarshallingWebServiceInboundGateway inboundGateway(Jaxb2Marshaller jaxb2Marshaller,
    DefaultSoapHeaderMapper defaultSoapHeaderMapper) {

    final MarshallingWebServiceInboundGateway inboundGateway =
        new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
    inboundGateway.setRequestChannelName(INPUT_CHANNEL_NAME);
    inboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
    return inboundGateway;
}

@Bean
public IntegrationFlow querySynchronous() {
    return IntegrationFlows.from(INPUT_CHANNEL_NAME)
        .enrichHeaders(...) 
        .wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
        .handle(outboundGateway)
        .wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_RESPONSE_RECEIVED_TIMESTAMP_NAME))
        //.transform( m -> m) // for tests - REMOVE
        .get();
}

和时间戳流:

public IntegrationFlow registerTimestampFlow(String asyncRequestReceivedTimestampName) {
    return channel -> channel.handle(
        m -> MetadataStoreConfig.registerFlowTimestamp(m, metadataStore, asyncRequestReceivedTimestampName));
}

这里值得注意的是,如果我取消注释无操作变压器,一切都会突然正常工作,但听起来不对,我想避免这种变通方法。

另一件事是,另一个非常相似的流程可以正常工作,没有任何变通方法。显着的区别在于它使用 kafka 适配器将消息放入 kafka,而不是使用出站网关调用一些 Web 服务。它仍然会生成对句柄的响应(使用 generateResponseFlow()),因此它的行为方式应该相同。这是流程,效果很好:

@Bean
public MarshallingWebServiceInboundGateway workingInboundGateway(Jaxb2Marshaller jaxb2Marshaller,
    DefaultSoapHeaderMapper defaultSoapHeaderMapper, @Qualifier("errorChannel") MessageChannel errorChannel) {

    MarshallingWebServiceInboundGateway aeoNotificationInboundGateway =
        new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
    aeoNotificationInboundGateway.setRequestChannelName(WORKING_INPUT_CHANNEL_NAME);
    aeoNotificationInboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
    aeoNotificationInboundGateway.setErrorChannel(errorChannel);
    return aeoNotificationInboundGateway;
}

@Bean
public IntegrationFlow workingEnqueue() {
    return IntegrationFlows.from(WORKING_INPUT_CHANNEL_NAME)
        .enrichHeaders(...)
        .wireTap(performanceTimestampRegistrator
            .registerTimestampFlow(ASYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
        .filter(...)
        .filter(...)
        .publishSubscribeChannel(channel -> channel
            .subscribe(sendToKafkaFlow())
            .subscribe(generateResponseFlow()))
        .wireTap(performanceTimestampRegistrator
            .registerTimestampFlow(ASYNC_REQUEST_ENQUEUED_TIMESTAMP_NAME))
        .get();
}

那么,wireTap 是最后一个组件就没有问题了,replyChannel 及时正确收到了响应,没有任何变通办法。

标签: springspring-integrationspring-integration-dsl

解决方案


该行为是预期的。当流尾使用wireTap()(或log())时,默认不回复。由于我们无法假设您尝试将什么逻辑包含在流定义中,因此我们会尽力使用默认行为 - 流变成单向的、发送后忘记的流:有些人真的要求将其设为非后可回复log()...

要让它仍然回复调用者,您需要bridge()在流的末尾添加一个。在文档中查看更多信息:https ://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log

它适用于您非常复杂的场景,因为您的订阅者之一publishSubscribeChannel就是generateResponseFlow()带有回复的订阅者。老实说,您需要小心请求-回复行为和这样的publishSubscribeChannel配置。replyChannel只能接受一个回复,如果您希望收到多个订阅者的回复,您会惊讶于这种行为是多么奇怪。

wireTap这个你的配置不是订阅者,它是一个注入到那个的拦截器publishSubscribeChannel。因此,您对相似性的假设具有误导性。窃听后流程结束,但由于其中一个订阅者正在回复,您会得到预期的行为。让我们看一下publishSubscribeChannel并联电路,其中所有连接都独立于其他连接获得电力。他们执行他们的工作而不影响所有其他人。无论如何,这是不同的故事。

总结:要在 之后从流中回复wireTap(),您需要指定一个bridge()回复消息将被正确路由到replyChannel来自调用者的。


推荐阅读