首页 > 解决方案 > Spring集成scatterGather聚合器无法返回结果

问题描述

我们正在更新到最新版本的 Spring 集成,并面临聚合器返回结果的问题

例如将字符串转换为大小写并将它们聚合到一个列表中

    public IntegrationFlow stringTransformer() {
        return f -> f
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(f1 -> f1.<List<String>, List<String>>transform(p -> p.stream().map(String::toUpperCase)
                                        .collect(Collectors.toList())))
                                .recipientFlow(f2 -> f2.<List<String>, List<String>>transform(p -> p.stream().map(String::toLowerCase)
                                        .collect(Collectors.toList()))),
                        gatherer -> gatherer.expireGroupsUponCompletion(true)
                                .outputProcessor(messageGroup -> {
                                    List<String> finalList;
                                     finalList = messageGroup.getMessages().stream()
                                             .map(Message::getPayload)
                                             .map(p -> (List<String>)p)
                                             .flatMap(Collection::stream)
                                             .collect(Collectors.toList());
                                    return finalList;
                                }),
                        s -> s.errorChannel("scatterGatherErrorChannel"));
    }

1.2.2聚合器的版本spring-integration-java-dsl没有任何东西,它工作正常。

5.5.2代码期望/检查聚合器是否返回 Type Messages

导致异常:

java.lang.IllegalArgumentException: The expected collection of Messages contains non-Message element: class java.lang.String: class java.lang.String
    at org.springframework.util.Assert.assignableCheckFailed(Assert.java:720) ~[spring-core-5.3.9.jar:5.3.9]
    at org.springframework.util.Assert.isAssignable(Assert.java:670) ~[spring-core-5.3.9.jar:5.3.9]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.verifyResultCollectionConsistsOfMessages(AbstractCorrelatingMessageHandler.java:943) ~[spring-integration-core-5.5.2.jar:5.5.2]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:902) ~[spring-integration-core-5.5.2.jar:5.5.2]

MessageGroup 在组的默认相关策略和发布策略上。

是否需要为聚合器设置任何其他标志而不将其视为消息?

标签: springspring-integrationspring-integration-dsl

解决方案


我不记得为什么需要这种改变。

建议你在.map(list -> MessageBuilder.withPayload(list));后面加上.collect()

它需要是 Spring Integration 的MessageBuilder- 而不是来自 spring-messaging 的。


推荐阅读