首页 > 解决方案 > 抽象出站网关实现的最佳方法

问题描述

我的 spring-integration 应用程序需要能够通过简单的配置更改在 Kafka 和旧消息传递库(tibco 集合点,spring-integration 不提供任何默认出站网关实现)之间按需切换。
遗留消息传递库提供了一些基本的请求/回复方法

Class LegacyTransport {
    Object request(Object query,String topic);
}

我正在尝试找出抽象两个消息出站网关(Kafka 和 legacy)的最佳方法,以便我可以在我的主要 IntegrationFlow 中将一个交换为另一个(通过简单的配置更改)。

我目前的想法是使用以下方法作为我的主要 IntegrationFlow 的一部分:

IntegrationFlowDefinition.gateway(IntegrationFlow flow)

首先创建 2 个具有相同名称的条件子流工厂 bean,用于包装我的每个消息传递网关:

@ConditionalOnProperty(name="messaging",havingValue=[TIBCO/KAFKA])
@Bean
Function<String,IntegrationFlow> gatewaySubflowFactory() {
    return (String topic) -> ((IntegrationFlowDefinition<?> f) ->
        f.handle(
            [messaging library specific implementation here]
        ));
}

然后在我的主要 IntegrationFlow 中使用该 bean:

@Bean
public IntegrationFlow mainFlow(Function<String,IntegrationFlow> gatewaySubflowFactory)

    return IntegrationFlows.from(INPUT_CHANNEL)
        ...
        [do some useful processing here]
        ...
        .gateway(gatewaySubflowFactory.apply("some_topic"))
        ...
        [do more useful stuff with gateway output here]
        ...
        .get()

有没有更好(更简单?)的方法?
非常感谢您的专业知识、想法和时间。
此致

标签: spring-integration

解决方案


任何出站网关只是更通用的服务激活器模式的特定实现。因此,您LegacyTransport.request()可以包装到服务激活器配置中。这是第一个。

第二:不要忘记。永远不要忘记。Spring Integration 中的一等公民之一是MessageChannel抽象:常规服务激活器,特别是 Kafka 的出站网关 - 与它们交互的主要点无关紧要,是为端点输入配置的消息通道。

因此,您的 Kafka 和 Tibco 流都可以从同一个通道开始。您的主要流程只是将其输出发送到该通道。有关IntegrationFlowDefinition.channel()更多信息,请参阅。

这两个特定的流程都可以用 来标记,以防止@ConditionalOnProperty它们在运行时出现。

总结一下我的推理,这里有一些配置:

@Bean
public IntegrationFlow mainFlow() {

    return IntegrationFlows.from(INPUT_CHANNEL)
        ...
        [do some useful processing here]
        ...
        .gateway(OUTBOUND_GATEWAY_CHANNEL)
        ...
        [do more useful stuff with gateway output here]
        ...
        .get()

}

@ConditionalOnProperty(name="messaging",havingValue=KAFKA)
@Bean
public IntegrationFlow kafkaFlow() {
   return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL) 
              .handle(Kafka.outboundGateway())
              .get();
}

@ConditionalOnProperty(name="messaging",havingValue=TIBCO)
@Bean
public IntegrationFlow tibcoFlow() {
   return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL) 
              .handle(legacyTransport, "request")
              .get();
}

推荐阅读