parallel-processing - apache camel - 并行处理器然后加入输出
问题描述
我希望对两个处理器进行并行处理(从不同的来源获取不同的信息),然后当两者都完成时,我想访问两个输出以进行进一步处理(例如比较)。
某种东西:
from("direct:start)
.processor("process1")
.processor("process2")
.to("direct:compare");
除了我需要“比较”端点的输出process1
和process2
可用的输出。
解决方案
这是使用多播和聚合策略实现的一种方式,
public class App {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(myRoute());
context.startRoute("start");
context.start();
ProducerTemplate producerTemplate = context.createProducerTemplate();
producerTemplate.sendBody("direct:start", null);
Thread.sleep(10_000);
context.stop();
}
private static RouteBuilder myRoute() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").routeId("start")
.multicast(new MyAggregationStrategy())
.parallelProcessing()
.to("direct:process1", "direct:process2", "direct:process3")
.end()
.to("direct:endgame");
from("direct:process1")
.process(e -> {
ArrayList<String> body = Lists.newArrayList("a", "b", "c");
e.getIn().setBody(body);
});
from("direct:process2")
.process(e -> {
ArrayList<String> body = Lists.newArrayList("1", "2", "3");
e.getIn().setBody(body);
});
from("direct:process3")
.process(e -> {
ArrayList<String> body = Lists.newArrayList("@", "#", "$");
e.getIn().setBody(body);
});
from("direct:endgame")
.process(e -> {
log.info(" This final result : " + e.getIn().getBody());
});
}
};
}
}
//This is where we can aggregate results of the process which is running in parallel
class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
ArrayList<Object> objects = Lists.newArrayList();
if (oldExchange == null) {
return newExchange;
}
Object o = oldExchange.getIn().getBody();
Object n = newExchange.getIn().getBody();
objects.add(o);
objects.add(n);
newExchange.getIn().setBody(objects);
return newExchange;
}
}
推荐阅读
- javascript - 多个字符串的一个正则表达式
- ios - 无法在已安装的 pod 类中访问我的项目类
- android - 为什么我不能让我的应用程序留在我的设备上?
- ios - 为什么我的静态库会出现 iOS 链接器错误?
- angular - 如何在焦点上显示自动完成选项
- excel - Excel 根据多个搜索条件查找多个唯一值
- kubernetes - Openshift Origin 3.6 删除 pod 总是与终结器一起挂起:-foregroundDeletion
- python - Selenium webdriver 无法在 Jupyter Notebook 的一个单元格中定位元素并将其定位在另一个单元格中
- bash - 如何尾随-F,但只保留最后5行的滚动输出
- kubernetes - 在最新的 0.7 版本中,OpenEBS Jiva 卷大小的单位是否有任何变化?