首页 > 解决方案 > apache camel - 并行处理器然后加入输出

问题描述

我希望对两个处理器进行并行处理(从不同的来源获取不同的信息),然后当两者都完成时,我想访问两个输出以进行进一步处理(例如比较)。

某种东西:

from("direct:start)
            .processor("process1")
            .processor("process2")
      .to("direct:compare");

除了我需要“比较”端点的输出process1process2可用的输出。

标签: parallel-processingapache-camelspring-camel

解决方案


这是使用多播和聚合策略实现的一种方式,

public class App {
  public static void main(String[] args) throws Exception {

    CamelContext context = new DefaultCamelContext();
    context.addRoutes(myRoute());
    context.startRoute("start");
    context.start();
    ProducerTemplate producerTemplate = context.createProducerTemplate();
    producerTemplate.sendBody("direct:start", null);
    Thread.sleep(10_000);
    context.stop();

  }

  private static RouteBuilder myRoute() {
    return new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        from("direct:start").routeId("start")
                .multicast(new MyAggregationStrategy())
                .parallelProcessing()
                .to("direct:process1", "direct:process2", "direct:process3")
                .end()
        .to("direct:endgame");

        from("direct:process1")
                .process(e -> {
                  ArrayList<String> body = Lists.newArrayList("a", "b", "c");
                  e.getIn().setBody(body);
                });

        from("direct:process2")
                .process(e -> {
                  ArrayList<String> body = Lists.newArrayList("1", "2", "3");
                  e.getIn().setBody(body);
                });

        from("direct:process3")
                .process(e -> {
                  ArrayList<String> body = Lists.newArrayList("@", "#", "$");
                  e.getIn().setBody(body);
                });


        from("direct:endgame")
                .process(e -> {
                  log.info(" This final result : " + e.getIn().getBody());
                });
      }
    };
  }
}

//This is where we can aggregate results of the process which is running in parallel
class MyAggregationStrategy implements AggregationStrategy {

  @Override
  public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    ArrayList<Object> objects = Lists.newArrayList();
    if (oldExchange == null) {
      return newExchange;
    }

    Object o = oldExchange.getIn().getBody();
    Object n = newExchange.getIn().getBody();

    objects.add(o);
    objects.add(n);

    newExchange.getIn().setBody(objects);

    return newExchange;
  }
}

推荐阅读