I'm picking up Java/Reactor after moving over from C#. I'm well versed in the C# async-await approach to non-blocking calls and am struggling to adapt to Flux/Mono.
I'm implementing a solution where I need to make a call to ElasticSearch via the Java SDK, get results, apply additional filters to strip out ES results, and keep paging through ES until my final collection of results is complete.
The ES SDK doesn't support Reactor but there are examples of Java adapter code that takes the ES callback and converts to a mono (I see a direct correlation to the C# async-await here as this is a non-blocking call to ES). What I then struggle with is the next bit - I need to take the results from the ES mono, filter them.
I do this by calling out to other external services to get additional data based on the results from the ES call, so I need to know the ids of each page of content the ES mono result before I can apply the filtering (effectively a kind of block), then apply the in-memory filters and if I don't have enough content, then go back to ES to get the next page... repeat until I have enough data or there are no more results from ES.
This appears to be very difficult to achieve compared to C# but I probably just don't understand the Java paradigm correctly.
My problem is that I can't use "block()" as this throws an error in Reactor 3.2 so I don't really know how to "wait" until the mono calls to ES and external services are complete until continuing. In C#, this would be as simple as call to an Async method with an await to handle the implicit callbacks
My blocking version (works in IntellJ, fails when published via maven and then run in a webserver) is effectively:
do {
var sr = GetSearchRequest(xxxx);
this.elasticsearch.results(sr)
.map(r -> chunk.addAdd(r))
.block();
if (chunk.size() == 0 {
isComplete = true;
}
else {
var filtered = postFilterResults(chunk);
finalResults.add(filtered);
if (finalResults.size() = MAXIMUM_RESULTS) {
isComplete = true;
}
esPage = esPage + 1;
while (isComplete == false);
If I try to subscribe() or other non-blocking reaktor calls, then (obviously) the code skips over the "get ES" bit and hits the do-while, looping repeatedly until the callback from ES finally happens and the subscribed map is invoked.
I think I need to perform an "async block" for each ES call but I don't know how.
To answer my own question... The underlying issue IMO is that Flux/Mono simply is not like any existing programming style in that it absolutely forces you to work within the fluent style that reactor mandates. This is very similar to C# Linq but it's almost a "false friend" as even things like loops need to be in Reactor.
In this case, the key issue to solve is one of paging and to keep doing this within a loop. it is very unclear how to achieve this as a subscription to a flux "locks in" the original parameters so repeating the subscription call simply gets the same page again. The solution is to use the Flux.defer method which forces lazy building of the subscription on each repeated invoke. You then need Atomic integers to keep track of the page counter across different calls. Again, this is something that C# handles for you, so it can catch a .net developer out.
Something like:
//The response from the elasticsearch adapter is a Flux<T> but we do not want to filter
//results on a row by row basis as this incurs one call for each row to the DB/Network
//(as appropriate). We choose to batch these up
var result = new SearchResult();
var page = new AtomicInteger();
var chunkSize = new AtomicInteger();
//Use a defer so we recalculate the subscription to the search with the new page count
var results = Flux.defer(() -> elasticsearch.results(GetSearchRequest(request, lc, pf, page.get()))
.doOnComplete(() -> {
chunkSize.set(0);
page.getAndAdd(1);
})
.collectList()
.map(chunk -> {
chunkSize.set(chunk.size());
return chunk;
})
.map(chunk -> postFilterResults(request, chunk, pf))
.map(filtered -> result.getDocuments().addAll(filtered)));
//Repeat the deferred flux (recalculating each time) until we have enough content or we don't get anything from the search engine
return results
.repeat()
.takeUntil(r -> chunkSize.get() == 0 || result.getDocuments().size() >= this.elasticsearch.getMaximumSearchResults())
.take(this.elasticsearch.getMaximumSearchResults())
.collectList()
.flatMap(r -> {
result.setTotalHits(result.getDocuments().size());
return Mono.just(result);
});