spring-integration - IntegrationFlowDefinition.aggregate 不起作用:也许 CorrelationStrategy 失败了?
问题描述
错误消息:原因:java.lang.IllegalStateException:不允许空关联。也许 CorrelationStrategy 失败了?
我的实现,
@Bean
public IntegrationFlow start() {
return IntegrationFlows
.from("getOrders")
.split()
.publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
.<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
.split(Item.class, Item::getItems)
.transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
// Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
.aggregate() // so, here aggregate method needs to aggregate ItemProperties.
.handle() // handler gets List<ItemProperty> as an input.
))
.get();
}
两个分离器工作正常。我还测试了第二个分配器后的变压器,工作正常。但是,当涉及到聚合时,它就失败了。我在这里缺少什么?
解决方案
您错过了这样一个事实,transformer
即按原样处理整个消息的端点类型。如果您自己创建消息,它不会修改它。因此,MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
您只会错过拆分器后的重要序列详细信息标题。因此,之后的聚合器不知道如何处理您的消息,因为您将其配置为默认关联策略,但您没有在消息中提供相应的标头。
从技术上讲,我认为没有理由在那里手动创建消息:简单return createItemProperty(getItemName, getItemId);
对您来说应该足够了。该框架将代表您创建消息,并复制相应的请求消息标头。
如果您仍然认为您需要在该转换中自己创建一条消息,那么您需要考虑copyHeaders()
从MessageBuilder
请求消息中携带所需的序列详细信息标头。
推荐阅读
- scipy - 傅里叶变换将信号分解为多个频率
- c# - 仅当每个任务成功完成/验证时才链接基于任务的方法
- python - python 的 VS 代码终端未显示输出。帮我修一下
- forms - 存储多个后端过滤器信息
- python - 如果不同,如何用另一个列表键值替换键值
- python - 如何比较Python中不同列表的第n个元素?
- reactjs - 如何使用 HOOK 使多个 Collapse 处于动态状态?
- c# - 将 Auth0 用于 Asp.Net MVC 项目如何将用户直接重定向到注册页面
- reactjs - 不必要的计算属性 ['@media (max-width:780px)'] 发现 no-useless-computed-key
- javascript - 如何将不同类型的表单输入附加到 formData()?