I'm quite new to reactive programming, but am trying to write some new code using Project Reactor's Flux/Mono APIs, in a Spring Boot application.
Imperatively I could write
List<Datum> data = library.getData()
Map<String, Datum> lookup = data.stream().collect(toMap(Datum::getId, Function.identity()));
List<OutDatum> outData = data.stream()
.map(d -> OutDatum.builder()
.id(d.getId)
...
.parent(lookup.get(datum.getParent()))
.build())
.collect(toList());
send(outData);
What I did as a first step
cache()
the flux so that it won't redo the work of fetching from the library.map()
on the data Flux to transform the Datum to OutDatum, using the above lookup Mono to get parent dataFlux<Datum> data = emitAsFlux(library::getData)
.cache();
Mono<Map<String, Datum>> lookup = data.collectMap(Datum::getId)
.cache();
send(data.map(d -> OutDatum.builder()
.id(d.getId)
...
.parent(lookup.block().get(d.getParent()))
.build())
I understand that blocking is not something that we like to see in reactive programming but due to the dependency on the map, it's necessary (from my understanding because of the cache() blocking multiple times is not detrimental, but that's something I would refactor out later if necessary).
My issue here is that it hangs. Because they're referring to the same original Flux, the lookup map cannot get built because the flux is not allowed to continue. There's a deadlock.
I tried creating a proxy Flux using share()
but it didn't seem to help.
FYI emitAsFlux
looks something like this:
private Flux<Datum> emitAsFlux(final Callable<List<Datum>> dataProvider) {
return Flux.create(emitter -> {
taskExecutor.execute(() -> {
try {
dataProvider.call()
.forEach(emitter::next);
emitter.complete();
} catch (Exception e) {
emitter.error(e);
}
});
});
}
The source library is returning List which means it is already a blocking code.
Not sure but i am guessing, the issue is happening due to following code -
.parent(lookup.block().get(d.getParent()))
Since source is already available as it is List which is not reactive , i would suggest we create a simple map for lookup instead of Mono of Map.
Your imperative code looks good and a modified version of it with reactive support is as below :
List<Datum> data = library.getData()
Map<String, Datum> lookup = data.stream().collect(toMap(Datum::getId, identity()));
Flux<OutDatum> outData = Flux.fromIterable(data)
.map(d -> OutDatum.builder()
.id(d.getId)
...
.parent(lookup.get(datum.getParent()))
.build());
send(outData);
In send(outData) you can send data as Flux and if you want server to send OutDatum in chunks use TEXT_EVENT_STREAM content type.
Regarding improvement, For now your approach looks fine, later if you think - One improvement can be done here is about source data which is right now non-reactive. Maybe at some point we can use a reactive DB as source data for parent lookup. But i am not sure , it depends on use case.