Let's say that we have the following entities: Project
and Release
, which is a one to many relationship.
Upon an event consumption from an SQS queue where a release id is sent as part of the event, there might be scenarios where we might have to create thousands of releases in our DB, where for each release we have to make a rest call to a 3rd party service in order to get some information for each release.
That means that we might have to make thousands of calls, in some cases more than 20k calls just to retrieve the information for the different releases and store it in the DB.
Obviously this is not scalable, so I'm not really sure what's the way to go in this scenario.
I know I might use a CompletableFuture, but I'm not sure how to use that with spring.
The http client that I am using is WebClient.
Any ideas?
You can make the save queries in a method transactional by adding the annotation @Transactional
above the method signature. The method should also be public, or else this annotation is ignored.
As for using CompletableFuture
in spring; You could make a http call method asynchronous by adding the @Async
annotation above its signature and by letting it return a CompletableFuture
as a return type. You should return a completed future holding the response value from the http call. You can easily make a completed future with the method CompletableFuture.completedFuture(yourValue)
. Spring will only return the completed future once the asynchronous method is done executing everything int its code block. For @Async
to work you must also add the @EnableAsync
annotation to one of your configuration classes. On top of that the @Async
annotated method must be public
and cannot be called by a method from within the same class. If the method is private
or is called from within the same class then the @Async
annotation will be ignored and instead the method will be executed in the same thread as the calling method is executed.
Next to an @Async
annotated method you could also use a parallelStream
to execute all 20K http calls in parallel. For example:
List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Lastly you could also use a ThreadPoolExecutor
to execute the http calls in parallel. An example:
List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.
//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());
//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());
private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
ReleaseInfo releaseInfo = null;
try {
releaseInfo = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
return releaseInfo;
}
}
Make sure to call shutdownNow()
on ThreadPoolExecutor
after you're done with it to avoid memory leaks.