spring-integration - Spring Integration - 使用 JDBCChannelMessageStore 进行聚合
问题描述
我正在使用 Spring boot 2.0.x 和 Spring integration 5.0.x。我可以使用correlationStrategy和releaseStrategy的自定义逻辑成功实现事件聚合,我不需要使用splitter。
@Bean
public IntegrationFlow aggregateFlow() {
return f -> f
.transform(transformUtils::toSvcInput)// this will transform the input event
.aggregate(agg -> agg.outputProcessor(group ->
new ResultAggregator("TBD",group.getMessages()
.stream()
.map(message -> (RusultService) message.getPayload())
.collect(Collectors.toList())))
.correlationStrategy(cs -> ((RusultService)cs.getPayload()).getEvent().getApplicationNumber())
.releaseStrategy(group -> (group.getMessages().stream()
.allMatch(e -> ((RusultService)e.getPayload()).getEvent()
.getProducts().get(0).getSubProductCode().equals("500")) && group.getMessages().size()==2) ||
(group.getMessages().stream()
.noneMatch(e -> ((RusultService)e.getPayload()).getEvent()
.getProducts().get(0).getSubProductCode().equals("500")) && group.getMessages().size()==1))
.messageStore(jdbcMessageGroupStore)
.sendPartialResultOnExpiry(false)
.expireGroupsUponCompletion(true)
.discardChannel("nullChannel")
.groupTimeout(600000L))
.channel("subsequentFlow.input");
}
messageStore 是一个配置了 Oracle 数据源的 JdbcMessageStore。
Q1。上面的配置在集群环境下是不是很好,部署了这个代码的多个实例,还有其他参数需要配置吗?
Q2。如果我更改相关策略/释放策略逻辑并重新部署应用程序,消息存储中的先前消息会卡住,即使在超时后也不会释放(过期)。任何其他解决方法?
Q3。如果我使用相同的消息存储创建另一个具有不同自定义逻辑的聚合器,会有什么问题吗?
Q4。如何使用 JdbcChannelMessageStore 实现相同的聚合(使用自定义 releaseStrategy 和correlationStrategy)(这是正确的用例),您有示例项目吗?
我计划使用注释实现相同的聚合器 bean(带有自定义 releaseStrategy 和相关逻辑),并使其可插入,因为我的不同微服务具有不同的自定义聚合逻辑。
我很想知道第四季度的答案,感谢您的意见,谢谢。
解决方案
- 它看起来是正确的。
- 这是一个错误;我打开了一个问题。您可以使用 a
MessageGroupStoreReaper
而不是使用组超时来解决它。 - 不; 只要它具有不同的相关算法(但请参阅我在问题中的注释)。
- 通道商店不是为在这里使用而设计的;它是对 JDBC 表支持通道的优化。
推荐阅读
- c# - C# .NET Core 3.1 OData:从 https 到 http 的意外更改 URI
- centrifuge - 连接到专用通道 сentrifuge 时如何更正错误?
- c - 插入链表出错
- scala - 使用列表中的多列过滤空格或空值
- .net - 如何使用流利的验证将来自 asp.net Web 应用程序的输入错误输入到我的 Index.cshtml 视图中?
- javascript - 单击时手风琴标题不起作用
- python - 忽略 NaN 的 Pandas 分组和转换
- postgresql - Postgres Jsonb 聚合
- python - URL 错误 404 找不到 Django 页面错误 x.html
- c++ - 想破坏我做的堆栈