Search code examples
javaapache-camel

How to consume multiple messages in parallel and detect when all execution have completed?


I want to send multiple messages that will traverse the same route asynchronously and be able to know when all processing have completed.

Since I need to know when each route has terminated, I thought about using ProducerTemplate#asyncRequestBody which use InOut pattern so that calling get on the Future object returned will block until the route has terminated.

So far so good, each request are sent asynchronously to the route, and looping over all Future calling get method will block until all my routes have completed.

The problem is that, while the requests are sent asynchronously, I want them to be also consumed in parallel.

For example, consider P being the ProducerTemplate, Rn beeing requests and En being endpoints - what I want is :

  ->   R0 -> from(E1).to(E2).to(E3) : done.
 /
P ->   R1 -> from(E1).to(E2).to(E3) : done.
 \  
  ->   R2 -> from(E1).to(E2).to(E3) : done.

        ^__ Requests consumed in parallel.

After a few research, I stumbled onto Competing Consumers which parallelize execution adding more consumers.

However, since there is multiple executions at the same time, this slow down the execution of each route which causes some ExchangeTimedOutException :

The OUT message was not received within: 20000 millis due reply message with correlationID...

Not a surprise, as I am sending an InOut request. But actually, I don't really care of the response, I use it only to know when my route has terminated. I would use an InOnly (ProducerTemplate#asyncSendBody), but calling Future#get would not block until the entire task is completed.

Is there another alternative to send requests asynchronously and detect when they have all completed?

Note that changing the timeout is not an option in my case.


Solution

  • My first instinct is to recommend using NotifyBuilder to track the processing, more specifically using the whenBodiesDone to target specific bodies.

    EDIT:

    Here's a trivial implementation but it does demonstrate a point:

    @SpringBootApplication
    public class DemoApplication {
    
      public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
      }
    
      @Component
      public static class ParallelProcessingRouteBuilder extends RouteBuilder {
        @Override
        public void configure() throws Exception {
          from("seda:test?concurrentConsumers=5")
              .routeId("parallel")
              .log("Received ${body}, processing")
              .delay(5000)
              .log("Processed ${body}")
              .stop();
    
          from("timer:testStarter?delay=3000&period=300000")
              .routeId("test timer")
              .process(exchange -> {
                // messages we want to track
                List<Integer> toSend = IntStream.range(0, 5).boxed().collect(toList());
    
                NotifyBuilder builder = new NotifyBuilder(getContext())
                    .fromRoute("parallel")
                    .filter(e -> toSend.contains(e.getIn().getBody(Integer.class)))
                    .whenDone(toSend.size())
                    .create();
    
                ProducerTemplate template = getContext().createProducerTemplate();
    
                // messages we do not want to track
                IntStream.range(10, 15)
                    .forEach(body -> template.sendBody("seda:test", body)); 
    
                toSend.forEach(body -> template.sendBody("seda:test", body)); 
    
                exchange.getIn().setBody(builder.matches(1, TimeUnit.MINUTES));
              })
              .log("Matched? ${body}");
        }
      }
    }
    

    And here's a sample of the logs:

    2016-08-06 11:45:03.861  INFO 27410 --- [1 - seda://test] parallel                                 : Received 10, processing
    2016-08-06 11:45:03.861  INFO 27410 --- [5 - seda://test] parallel                                 : Received 11, processing
    2016-08-06 11:45:03.864  INFO 27410 --- [2 - seda://test] parallel                                 : Received 12, processing
    2016-08-06 11:45:03.865  INFO 27410 --- [4 - seda://test] parallel                                 : Received 13, processing
    2016-08-06 11:45:03.866  INFO 27410 --- [3 - seda://test] parallel                                 : Received 14, processing
    2016-08-06 11:45:08.867  INFO 27410 --- [1 - seda://test] parallel                                 : Processed 10
    2016-08-06 11:45:08.867  INFO 27410 --- [3 - seda://test] parallel                                 : Processed 14
    2016-08-06 11:45:08.867  INFO 27410 --- [4 - seda://test] parallel                                 : Processed 13
    2016-08-06 11:45:08.868  INFO 27410 --- [2 - seda://test] parallel                                 : Processed 12
    2016-08-06 11:45:08.868  INFO 27410 --- [5 - seda://test] parallel                                 : Processed 11
    2016-08-06 11:45:08.870  INFO 27410 --- [1 - seda://test] parallel                                 : Received 0, processing
    2016-08-06 11:45:08.872  INFO 27410 --- [4 - seda://test] parallel                                 : Received 2, processing
    2016-08-06 11:45:08.872  INFO 27410 --- [3 - seda://test] parallel                                 : Received 1, processing
    2016-08-06 11:45:08.872  INFO 27410 --- [2 - seda://test] parallel                                 : Received 3, processing
    2016-08-06 11:45:08.872  INFO 27410 --- [5 - seda://test] parallel                                 : Received 4, processing
    2016-08-06 11:45:13.876  INFO 27410 --- [1 - seda://test] parallel                                 : Processed 0
    2016-08-06 11:45:13.876  INFO 27410 --- [3 - seda://test] parallel                                 : Processed 1
    2016-08-06 11:45:13.876  INFO 27410 --- [4 - seda://test] parallel                                 : Processed 2
    2016-08-06 11:45:13.876  INFO 27410 --- [5 - seda://test] parallel                                 : Processed 4
    2016-08-06 11:45:13.876  INFO 27410 --- [2 - seda://test] parallel                                 : Processed 3
    2016-08-06 11:45:13.877  INFO 27410 --- [r://testStarter] test timer                               : Matched? true
    

    You'll notice how the NotifyBuilder returned the result as soon as the results matched.