Search code examples
javareactive-programmingspring-webfluxproject-reactor

Flux data depends on data later in the stream - Seeking better patterns


I'm quite new to reactive programming, but am trying to write some new code using Project Reactor's Flux/Mono APIs, in a Spring Boot application.

  1. I have a finite stream of data (with unique IDs) coming from some library.
  2. Each datum in the stream has a "parent" datum in the stream, which might have been emitted before, or is still to come.
  3. I need to transform this data before sending it to another system into objects that include the parent.

Imperatively I could write

List<Datum> data = library.getData()
Map<String, Datum> lookup = data.stream().collect(toMap(Datum::getId, Function.identity()));
List<OutDatum> outData = data.stream()
                             .map(d -> OutDatum.builder()
                                               .id(d.getId)
                                               ...
                                               .parent(lookup.get(datum.getParent()))
                                               .build())
                             .collect(toList());
send(outData);

What I did as a first step

  1. Create a Flux from the data (emitting from the list for now, hoping that the library can later actually provide the data in a streaming fashion).
  2. cache() the flux so that it won't redo the work of fetching from the library.
  3. Create another lookup Mono based off the data Flux that creates a map for lookups
  4. Use map() on the data Flux to transform the Datum to OutDatum, using the above lookup Mono to get parent data
  5. Pass on the mapped data Flux to the WebClient to send
Flux<Datum> data = emitAsFlux(library::getData)
                       .cache();

Mono<Map<String, Datum>> lookup = data.collectMap(Datum::getId)
                                      .cache();

send(data.map(d -> OutDatum.builder()
                           .id(d.getId)
                           ...
                           .parent(lookup.block().get(d.getParent()))
                           .build())

I understand that blocking is not something that we like to see in reactive programming but due to the dependency on the map, it's necessary (from my understanding because of the cache() blocking multiple times is not detrimental, but that's something I would refactor out later if necessary).

My issue here is that it hangs. Because they're referring to the same original Flux, the lookup map cannot get built because the flux is not allowed to continue. There's a deadlock. I tried creating a proxy Flux using share() but it didn't seem to help.

  1. Is there a way to let the Map exhaust the stream while the employee Flux is still processing elements earlier in the stream?
  2. I'd love to know what a good pattern for implementing this reactively would be.
  3. I'd also like to know where I'm being stupid or appear to be lacking understanding.

FYI emitAsFlux looks something like this:

    private Flux<Datum> emitAsFlux(final Callable<List<Datum>> dataProvider) {
        return Flux.create(emitter -> {
            taskExecutor.execute(() -> {
                try {
                    dataProvider.call()
                                .forEach(emitter::next);
                    emitter.complete();
                } catch (Exception e) {
                    emitter.error(e);
                }
            });
        });
    }


Solution

  • The source library is returning List which means it is already a blocking code.

    Not sure but i am guessing, the issue is happening due to following code -

    .parent(lookup.block().get(d.getParent()))
    

    Since source is already available as it is List which is not reactive , i would suggest we create a simple map for lookup instead of Mono of Map.

    Your imperative code looks good and a modified version of it with reactive support is as below :

    List<Datum> data = library.getData()
    Map<String, Datum> lookup = data.stream().collect(toMap(Datum::getId, identity()));
    
    Flux<OutDatum> outData = Flux.fromIterable(data)
                                 .map(d -> OutDatum.builder()
                                                   .id(d.getId)
                                                   ...
                                                   .parent(lookup.get(datum.getParent()))
                                                   .build());
    send(outData); 
    

    In send(outData) you can send data as Flux and if you want server to send OutDatum in chunks use TEXT_EVENT_STREAM content type.

    Regarding improvement, For now your approach looks fine, later if you think - One improvement can be done here is about source data which is right now non-reactive. Maybe at some point we can use a reactive DB as source data for parent lookup. But i am not sure , it depends on use case.