Search code examples
parallel-processingapache-camelspring-camel

apache camel - parallel processor then join output


I hope to do a parallel processing of two processors (fetching different info from different sources) then when both completed, i wanted to have access to both output for further processing (e.g. comparisons).

Something of sort:

from("direct:start)
            .processor("process1")
            .processor("process2")
      .to("direct:compare");

Except I need both output from process1 and process2 to be available in "compare" endpoint.


Solution

  • This is one way to achieve using multicast and aggregation strategy,

    public class App {
      public static void main(String[] args) throws Exception {
    
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(myRoute());
        context.startRoute("start");
        context.start();
        ProducerTemplate producerTemplate = context.createProducerTemplate();
        producerTemplate.sendBody("direct:start", null);
        Thread.sleep(10_000);
        context.stop();
    
      }
    
      private static RouteBuilder myRoute() {
        return new RouteBuilder() {
          @Override
          public void configure() throws Exception {
            from("direct:start").routeId("start")
                    .multicast(new MyAggregationStrategy())
                    .parallelProcessing()
                    .to("direct:process1", "direct:process2", "direct:process3")
                    .end()
            .to("direct:endgame");
    
            from("direct:process1")
                    .process(e -> {
                      ArrayList<String> body = Lists.newArrayList("a", "b", "c");
                      e.getIn().setBody(body);
                    });
    
            from("direct:process2")
                    .process(e -> {
                      ArrayList<String> body = Lists.newArrayList("1", "2", "3");
                      e.getIn().setBody(body);
                    });
    
            from("direct:process3")
                    .process(e -> {
                      ArrayList<String> body = Lists.newArrayList("@", "#", "$");
                      e.getIn().setBody(body);
                    });
    
    
            from("direct:endgame")
                    .process(e -> {
                      log.info(" This final result : " + e.getIn().getBody());
                    });
          }
        };
      }
    }
    
    //This is where we can aggregate results of the process which is running in parallel
    class MyAggregationStrategy implements AggregationStrategy {
    
      @Override
      public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        ArrayList<Object> objects = Lists.newArrayList();
        if (oldExchange == null) {
          return newExchange;
        }
    
        Object o = oldExchange.getIn().getBody();
        Object n = newExchange.getIn().getBody();
    
        objects.add(o);
        objects.add(n);
    
        newExchange.getIn().setBody(objects);
    
        return newExchange;
      }
    }