I want to send multiple messages that will traverse the same route asynchronously and be able to know when all processing have completed.
Since I need to know when each route has terminated, I thought about using
ProducerTemplate#asyncRequestBody
which use InOut pattern so that calling get
on the Future
object returned will block until the route has terminated.
So far so good, each request are sent asynchronously to the route, and looping over all Future calling get method will block until all my routes have completed.
The problem is that, while the requests are sent asynchronously, I want them to be also consumed in parallel.
For example, consider P being the ProducerTemplate
, Rn beeing requests and En being endpoints - what I want is :
-> R0 -> from(E1).to(E2).to(E3) : done.
/
P -> R1 -> from(E1).to(E2).to(E3) : done.
\
-> R2 -> from(E1).to(E2).to(E3) : done.
^__ Requests consumed in parallel.
After a few research, I stumbled onto Competing Consumers which parallelize execution adding more consumers.
However, since there is multiple executions at the same time, this slow down the execution of each route which causes some ExchangeTimedOutException
:
The OUT message was not received within: 20000 millis due reply message with correlationID...
Not a surprise, as I am sending an InOut request. But actually, I don't really care of the response, I use it only to know
when my route has terminated. I would use an InOnly (ProducerTemplate#asyncSendBody
), but calling Future#get
would not block until
the entire task is completed.
Is there another alternative to send requests asynchronously and detect when they have all completed?
Note that changing the timeout is not an option in my case.
My first instinct is to recommend using NotifyBuilder to track the processing, more specifically using the whenBodiesDone
to target specific bodies.
EDIT:
Here's a trivial implementation but it does demonstrate a point:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Component
public static class ParallelProcessingRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("seda:test?concurrentConsumers=5")
.routeId("parallel")
.log("Received ${body}, processing")
.delay(5000)
.log("Processed ${body}")
.stop();
from("timer:testStarter?delay=3000&period=300000")
.routeId("test timer")
.process(exchange -> {
// messages we want to track
List<Integer> toSend = IntStream.range(0, 5).boxed().collect(toList());
NotifyBuilder builder = new NotifyBuilder(getContext())
.fromRoute("parallel")
.filter(e -> toSend.contains(e.getIn().getBody(Integer.class)))
.whenDone(toSend.size())
.create();
ProducerTemplate template = getContext().createProducerTemplate();
// messages we do not want to track
IntStream.range(10, 15)
.forEach(body -> template.sendBody("seda:test", body));
toSend.forEach(body -> template.sendBody("seda:test", body));
exchange.getIn().setBody(builder.matches(1, TimeUnit.MINUTES));
})
.log("Matched? ${body}");
}
}
}
And here's a sample of the logs:
2016-08-06 11:45:03.861 INFO 27410 --- [1 - seda://test] parallel : Received 10, processing
2016-08-06 11:45:03.861 INFO 27410 --- [5 - seda://test] parallel : Received 11, processing
2016-08-06 11:45:03.864 INFO 27410 --- [2 - seda://test] parallel : Received 12, processing
2016-08-06 11:45:03.865 INFO 27410 --- [4 - seda://test] parallel : Received 13, processing
2016-08-06 11:45:03.866 INFO 27410 --- [3 - seda://test] parallel : Received 14, processing
2016-08-06 11:45:08.867 INFO 27410 --- [1 - seda://test] parallel : Processed 10
2016-08-06 11:45:08.867 INFO 27410 --- [3 - seda://test] parallel : Processed 14
2016-08-06 11:45:08.867 INFO 27410 --- [4 - seda://test] parallel : Processed 13
2016-08-06 11:45:08.868 INFO 27410 --- [2 - seda://test] parallel : Processed 12
2016-08-06 11:45:08.868 INFO 27410 --- [5 - seda://test] parallel : Processed 11
2016-08-06 11:45:08.870 INFO 27410 --- [1 - seda://test] parallel : Received 0, processing
2016-08-06 11:45:08.872 INFO 27410 --- [4 - seda://test] parallel : Received 2, processing
2016-08-06 11:45:08.872 INFO 27410 --- [3 - seda://test] parallel : Received 1, processing
2016-08-06 11:45:08.872 INFO 27410 --- [2 - seda://test] parallel : Received 3, processing
2016-08-06 11:45:08.872 INFO 27410 --- [5 - seda://test] parallel : Received 4, processing
2016-08-06 11:45:13.876 INFO 27410 --- [1 - seda://test] parallel : Processed 0
2016-08-06 11:45:13.876 INFO 27410 --- [3 - seda://test] parallel : Processed 1
2016-08-06 11:45:13.876 INFO 27410 --- [4 - seda://test] parallel : Processed 2
2016-08-06 11:45:13.876 INFO 27410 --- [5 - seda://test] parallel : Processed 4
2016-08-06 11:45:13.876 INFO 27410 --- [2 - seda://test] parallel : Processed 3
2016-08-06 11:45:13.877 INFO 27410 --- [r://testStarter] test timer : Matched? true
You'll notice how the NotifyBuilder
returned the result as soon as the results matched.