首页 > 解决方案 > 无法从聚合路由丰富交换

问题描述

我在理解为什么聚合器没有从用于丰富原始交换的子路由接收正确的交换时遇到问题。我能说的是,日志在路由结束时正确打印,但在父路由中,我只在子路由中从早期获得消息正文。话虽如此,这里是路线的浓缩摘录。

this.from("direct:fetchPartnersForAggregation")
.to("sql:select * from ...")
.process(exchange -> {
List<Map<String, Object>> payload = exchange.getIn().getBody(ArrayList.class);
exchange.getIn().setHeader("numOfPartners", payload.size());
if(payload.size() < 1 || (payload.size() == 1 && payload.get(0).get("process_id")== null))
  exchange.getIn().setBody(null);
})
.choice()
.when(not(emptyBody))
    .split(body())
    .streaming()
    .setHeader("partner_id", simple("${body[process_id]}"))
    .to("sql:select ....")
    .setHeader(Constants.EXCHANGE_HEADERS.OBJECT_TYPE, simple("partners"))
    .to("log:partner_extracted?showBody=true&showHeaders=true")
    .process(appIdCollator)
    .process(partnerBuilder)
    .aggregate(constant(true), partnersAggregator)
    .completionPredicate(new PartnerSizePredicate())
    .to("log:partners_collated?showBody=true&showHeaders=true")  //The log prints everything correctly
.end();


//In my other route:
...
.enrich("direct:fetchPartnersForAggregation", partnersAggregator).id("partners_added") <========== The 'newExchange' in this partnersAggregator contains the body returned in the first sql:Select in the aggregation route.

标签: apache-camel

解决方案


我没有意识到拆分器有自己的聚合策略构建,可以根据odcumentation覆盖。我只是移动了我的聚合器来覆盖它,我现在有了正确的交换:

this.from("direct:fetchPartnersForAggregation")
.to("sql:select * from ...")
.process(exchange -> {
List<Map<String, Object>> payload = exchange.getIn().getBody(ArrayList.class);
exchange.getIn().setHeader("numOfPartners", payload.size());
if(payload.size() < 1 || (payload.size() == 1 && payload.get(0).get("process_id")== null))
  exchange.getIn().setBody(null);
})
.choice()
.when(emptyBody)
    .stop()
.otherwise()
    .split(body(), partnersAggregator)
    .streaming()
    .setHeader("partner_id", simple("${body[process_id]}"))
    .to("sql:select ....")
    .setHeader(Constants.EXCHANGE_HEADERS.OBJECT_TYPE, simple("partners"))
    .to("log:partner_extracted?showBody=true&showHeaders=true")
    .process(appIdCollator)
    .process(partnerBuilder)
    .to("log:partners_collated?showBody=true&showHeaders=true")
.end();

推荐阅读