首页 > 解决方案 > Spring Integration - 使用 JDBCChannelMessageStore 进行聚合

问题描述

我正在使用 Spring boot 2.0.x 和 Spring integration 5.0.x。我可以使用correlationStrategy和releaseStrategy的自定义逻辑成功实现事件聚合,我不需要使用splitter。

    
      @Bean
        public IntegrationFlow aggregateFlow() {
            return f -> f
                    .transform(transformUtils::toSvcInput)// this will transform the input event
                    .aggregate(agg -> agg.outputProcessor(group ->
                                    new ResultAggregator("TBD",group.getMessages()
                                            .stream()
                                            .map(message -> (RusultService) message.getPayload())
                                            .collect(Collectors.toList())))
                            .correlationStrategy(cs -> ((RusultService)cs.getPayload()).getEvent().getApplicationNumber())
                            .releaseStrategy(group -> (group.getMessages().stream()
                                    .allMatch(e -> ((RusultService)e.getPayload()).getEvent()
                                            .getProducts().get(0).getSubProductCode().equals("500")) && group.getMessages().size()==2) ||
                                    (group.getMessages().stream()
                                            .noneMatch(e -> ((RusultService)e.getPayload()).getEvent()
                                                    .getProducts().get(0).getSubProductCode().equals("500")) && group.getMessages().size()==1))
                            .messageStore(jdbcMessageGroupStore)
                            .sendPartialResultOnExpiry(false)                       
                            .expireGroupsUponCompletion(true)
                            .discardChannel("nullChannel")
                            .groupTimeout(600000L))
                    .channel("subsequentFlow.input");
        } 

messageStore 是一个配置了 Oracle 数据源的 JdbcMessageStore。

Q1。上面的配置在集群环境下是不是很好,部署了这个代码的多个实例,还有其他参数需要配置吗?

Q2。如果我更改相关策略/释放策略逻辑并重新部署应用程序,消息存储中的先前消息会卡住,即使在超时后也不会释放(过期)。任何其他解决方法?

Q3。如果我使用相同的消息存储创建另一个具有不同自定义逻辑的聚合器,会有什么问题吗?

Q4。如何使用 JdbcChannelMessageStore 实现相同的聚合(使用自定义 releaseStrategy 和correlationStrategy)(这是正确的用例),您有示例项目吗?

我计划使用注释实现相同的聚合器 bean(带有自定义 releaseStrategy 和相关逻辑),并使其可插入,因为我的不同微服务具有不同的自定义聚合逻辑。

我很想知道第四季度的答案,感谢您的意见,谢谢。

标签: spring-integrationaggregationspring-integration-dsl

解决方案


  1. 它看起来是正确的。
  2. 这是一个错误;我打开了一个问题。您可以使用 aMessageGroupStoreReaper而不是使用组超时来解决它。
  3. 不; 只要它具有不同的相关算法(但请参阅我在问题中的注释)。
  4. 通道商店不是为在这里使用而设计的;它是对 JDBC 表支持通道的优化。

推荐阅读