首页 > 解决方案 > Spring集成:如何在MessageChain中间实现JDBCOutboundGateway?

问题描述

对我来说,这似乎是一个简单的问题,可能会在所有地方复制。MessageHandlerChain 的一个非常基本的应用程序,可能只使用开箱即用的功能。

从概念上讲,我需要的是:

(1) Polled JDBC reader (sets parameters for integration pass)
    |
    V
(2) JDBC Reader (uses input from (1) to fetch data to feed through channel
    |
    V
(3) JDBC writer (writes data fetched by (2) to target)
    |
    V
(4) JDBC writer (writes additional data from the original parameters fetched in (1))

我认为我需要的是

Flow:
From: JdbcPollingChannelAdapter (setup adapter)
Handler: messageHandlerChain
    Handlers (
       JdbcPollingChannelAdapter (inbound adapter)
       JdbcOutboundGateway (outbound adapter)
       JdbcOutboundGateway (cleanup gateway)
    )

JdbcPollingChannelAdapter 没有实现MessageHandler API,所以我很茫然如何根据设置步骤读取实际数据。

由于 JdbcOutboundGateway 没有实现 MessageProducer API,所以对于出站适配器需要使用什么我有点茫然。

我应该使用 OOB 类吗?或者我是否需要以某种方式将两个适配器包装在 BridgeHandlers 中以使其工作?

提前致谢


编辑(2) 附加配置问题

设置适配器使用两个时间戳列拉回单行。“丰富的标题”部分正在正确处理它们。

但是,当入站适配器正在执行时,框架将 java.lang.Object 作为参数传入。不是字符串,不是时间戳,而是一个实际的 java.lang.Object,如new Object ().

它传递了正确数量的对象,但内容和数据类型丢失了。我是否正确需要配置 ExpressionEvaluatingSqlParameterSourceFactory?

信息:

GenericMessage [payload=[{startTime=2020-11-18 18:01:34.90944, endTime=2020-11-18 18:01:34.90944}], headers={startTime=2020-11-18 18:01:34.90944, id=835edf42-6f69-226a-18f4-ade030c16618, timestamp=1605897225384}]

JdbcOutboundGateway 中的 SQL:

Select t.*, w.operation as "ops" from ADDRESS t
Inner join TT_ADDRESS w 
  on (t.ADDRESSID = w.ADDRESSID)
  And (w.LASTUPDATESTAMP >= :payload.from[0].get("startTime") and w.LASTUPDATESTAMP <= :payload.from[0].get("endTime") )

编辑:添加解决方案 java DSL 配置

private JdbcPollingChannelAdapter setupAdapter; // select only
private JdbcOutboundGateway inboundAdapter; // select only
private JdbcOutboundGateway insertUpdateAdapter; // update only
private JdbcOutboundGateway deleteAdapter; // update only
private JdbcMessageHandler cleanupAdapter; // update only

        setFlow(IntegrationFlows
            .from(setupAdapter, c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload.from[0].get(\"ALC_startTime\")")
                    .headerExpression("ALC_endTime", "payload.from[0].get(\"ALC_endTime\")"))
            .handle(inboundAdapter)
            .enrichHeaders(h -> h.headerExpression("ALC_operation", "payload.from[0].get(\"ALC_operation\")"))
            .handle(insertUpdateAdapter)
            .handle(deleteAdapter)
            .handle(cleanupAdapter)
            .get());

flowContext.registration(flow).id(this.getId().toString()).register();

标签: spring-integration

解决方案


如果您想将原始参数传递到流中的最后一个网关,则需要将这些参数存储在标头中,因为在每一步之后,回复消息的有效负载都会有所不同,并且您将没有原始设置数据那边没有了。这是第一个。

第二:如果您处理IntegrationFlowJava DSL,则无需担心,messageHandlerChain因为从概念上讲,IntegrationFlow它本身就是一个链,但要先进得多。

我不确定为什么您需要JdbcPollingChannelAdapter在流程开始时根据来自源的传入消息使用 a 来按需请求数据。

你肯定仍然需要使用JdbcOutboundGatewayfor justSELECT模式。这updateQuery是可选的,因此网关将SELECT在回复消息的有效负载中执行并为您返回数据。

如果您接下来的两个步骤只是“编写”并且您不关心结果,那么您可能只需查看 aPublishSubscribeChannel和 twoJdbcMessageHandler作为它的订阅者。如果没有提供ExecutorPublishSubscribeChannel它们将被一个接一个地执行。


推荐阅读