首页 > 解决方案 > IntegrationFlowDefinition.aggregate 不起作用:也许 CorrelationStrategy 失败了?

问题描述

错误消息:原因:java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了?

我的实现,

    @Bean
    public IntegrationFlow start() {
        return IntegrationFlows
                .from("getOrders")
                .split()
                .publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
                        .<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
                        .split(Item.class, Item::getItems)
                        .transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
                                     //  Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
                        .aggregate() // so, here aggregate method needs to aggregate ItemProperties.
                        .handle() // handler gets List<ItemProperty> as an input.
      ))
      .get();
    }

两个分离器工作正常。我还测试了第二个分配器后的变压器,工作正常。但是,当涉及到聚合时,它就失败了。我在这里缺少什么?

标签: spring-integrationspring-integration-dsl

解决方案


您错过了这样一个事实,transformer即按原样处理整个消息的端点类型。如果您自己创建消息,它不会修改它。因此,MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();您只会错过拆分器后的重要序列详细信息标题。因此,之后的聚合器不知道如何处理您的消息,因为您将其配置为默认关联策略,但您没有在消息中提供相应的标头。

从技术上讲,我认为没有理由在那里手动创建消息:简单return createItemProperty(getItemName, getItemId);对您来说应该足够了。该框架将代表您创建消息,并复制相应的请求消息标头。

如果您仍然认为您需要在该转换中自己创建一条消息,那么您需要考虑copyHeaders()MessageBuilder请求消息中携带所需的序列详细信息标头。


推荐阅读