spring-integration - Spring集成:如何在MessageChain中间实现JDBCOutboundGateway?
问题描述
对我来说,这似乎是一个简单的问题,可能会在所有地方复制。MessageHandlerChain 的一个非常基本的应用程序,可能只使用开箱即用的功能。
从概念上讲,我需要的是:
(1) Polled JDBC reader (sets parameters for integration pass)
|
V
(2) JDBC Reader (uses input from (1) to fetch data to feed through channel
|
V
(3) JDBC writer (writes data fetched by (2) to target)
|
V
(4) JDBC writer (writes additional data from the original parameters fetched in (1))
我认为我需要的是
Flow:
From: JdbcPollingChannelAdapter (setup adapter)
Handler: messageHandlerChain
Handlers (
JdbcPollingChannelAdapter (inbound adapter)
JdbcOutboundGateway (outbound adapter)
JdbcOutboundGateway (cleanup gateway)
)
JdbcPollingChannelAdapter 没有实现MessageHandler API,所以我很茫然如何根据设置步骤读取实际数据。
由于 JdbcOutboundGateway 没有实现 MessageProducer API,所以对于出站适配器需要使用什么我有点茫然。
我应该使用 OOB 类吗?或者我是否需要以某种方式将两个适配器包装在 BridgeHandlers 中以使其工作?
提前致谢
编辑(2) 附加配置问题
设置适配器使用两个时间戳列拉回单行。“丰富的标题”部分正在正确处理它们。
但是,当入站适配器正在执行时,框架将 java.lang.Object 作为参数传入。不是字符串,不是时间戳,而是一个实际的 java.lang.Object,如new Object ()
.
它传递了正确数量的对象,但内容和数据类型丢失了。我是否正确需要配置 ExpressionEvaluatingSqlParameterSourceFactory?
信息:
GenericMessage [payload=[{startTime=2020-11-18 18:01:34.90944, endTime=2020-11-18 18:01:34.90944}], headers={startTime=2020-11-18 18:01:34.90944, id=835edf42-6f69-226a-18f4-ade030c16618, timestamp=1605897225384}]
JdbcOutboundGateway 中的 SQL:
Select t.*, w.operation as "ops" from ADDRESS t
Inner join TT_ADDRESS w
on (t.ADDRESSID = w.ADDRESSID)
And (w.LASTUPDATESTAMP >= :payload.from[0].get("startTime") and w.LASTUPDATESTAMP <= :payload.from[0].get("endTime") )
编辑:添加解决方案 java DSL 配置
private JdbcPollingChannelAdapter setupAdapter; // select only
private JdbcOutboundGateway inboundAdapter; // select only
private JdbcOutboundGateway insertUpdateAdapter; // update only
private JdbcOutboundGateway deleteAdapter; // update only
private JdbcMessageHandler cleanupAdapter; // update only
setFlow(IntegrationFlows
.from(setupAdapter, c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload.from[0].get(\"ALC_startTime\")")
.headerExpression("ALC_endTime", "payload.from[0].get(\"ALC_endTime\")"))
.handle(inboundAdapter)
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload.from[0].get(\"ALC_operation\")"))
.handle(insertUpdateAdapter)
.handle(deleteAdapter)
.handle(cleanupAdapter)
.get());
flowContext.registration(flow).id(this.getId().toString()).register();
解决方案
如果您想将原始参数传递到流中的最后一个网关,则需要将这些参数存储在标头中,因为在每一步之后,回复消息的有效负载都会有所不同,并且您将没有原始设置数据那边没有了。这是第一个。
第二:如果您处理IntegrationFlow
Java DSL,则无需担心,messageHandlerChain
因为从概念上讲,IntegrationFlow
它本身就是一个链,但要先进得多。
我不确定为什么您需要JdbcPollingChannelAdapter
在流程开始时根据来自源的传入消息使用 a 来按需请求数据。
你肯定仍然需要使用JdbcOutboundGateway
for justSELECT
模式。这updateQuery
是可选的,因此网关将SELECT
在回复消息的有效负载中执行并为您返回数据。
如果您接下来的两个步骤只是“编写”并且您不关心结果,那么您可能只需查看 aPublishSubscribeChannel
和 twoJdbcMessageHandler
作为它的订阅者。如果没有提供Executor
,PublishSubscribeChannel
它们将被一个接一个地执行。
推荐阅读
- python - 按值拆分列表的快速方法?
- javascript - 如果变量类型没有由 var、let 或 const 初始化,或者仍然分配给数字类型,为什么它总是“字符串”?
- swiftui - SwiftUI:.sheet() 在关闭当前工作表时不会使用预期数据转到上一个视图
- python - python3上的打印语句不再登录谷歌云平台
- .net-core - 如何使用嵌入式调试类型的 VS 2019 代码覆盖率?
- c# - C# 数据表到字典
- c# - C# WebClient 下载某些页面的源代码,但不是全部
- c++ - C++) E0349 没有操作符匹配这些操作数
- .net-core - 如何从 VS 代码覆盖中排除测试程序集?
- swiftui - 无法转换类型 '(Int) -> VStack 的值