Search code examples
javaproject-reactorreactive-streamsspring-reactivespring-reactor

How to combine/chain multiple Mono/Flux containing different datatypes without nested subscriptions


We are using project-reactor to retrieve some data from an external webservice and generate a bunch of resulting objects.

First we need to fetch some masterdata that is needed to trigger the next webservice calls. After the masterdata is available we retrieve some more data based on the results of the masterdata. Next we have to wait for all Monos to emit it's result. Then we process all the data and build our resulting object.

We don't have much experience with reactive-streams. Our solution with the nested subscriptions works but we believe that there might be a better way to archive what we want to do.

Question 1

Masterdata_A and Masterdata_B could be fetched in parallel but how to express this in a reactive way without nesting? Each result of getFluxMasterdata_B should be combined with the one result of getMonoMasterdata_A.

Question 2

The Tupel with both Masterdatas should be limited in some way to not overwhelme the webservice with to many data-requests. The actual delay of 1second is just a guess that seems to work but it would be better to define a max number of parallel executions of the first inner flatMap to have a maximum of N waiting webservice calls at a time.

Question 3

In future there might be some more data we must fetch from the webservice to build the ProcessingResult. Is there a best practice to define the reactive-stream to keep it readable/understandable? Is nesting of reactive-streams OK or should it be avoided (keep everything on the top level)?


DomainModel

    private static class Masterdata_A
    {
        private List<MasterdataRecord_A> records;
    }

    private static class MasterdataRecord_A { /* ... business relevant fields */ }
    private static class MasterdataRecord_B { /* ... business relevant fields */ }
    private static class Data_A { /* ... business relevant fields */ }
    private static class Data_B { /* ... business relevant fields */ }
    private static class Data_C { /* ... business relevant fields */ }

    private static class ProcessingResult { /* ... business relevant fields */ }

WebserviceImpl

    private static class Webservice
    {
        private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
        private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }

        private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
        private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
        private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
    }

BusinessServiceImpl

    public class BusinessService
    {
        public void processData(...params...)
        {
            Webservice webservie = getWebservice();
            // As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
            // to fetch some extra data from the service based on the actual masterdata.
            // For building the ProcessingResult we need access to all data available in the actual context.
            webservice.getMonoMasterdata_A()
                    .subscribe((Masterdata_A masterdataA) -> {
                        webservice.getFluxMasterdata_B()
                                .delayElements(Duration.ofSeconds(1))
                                .flatMap((MasterdataRecord_B masterdataB) -> {
                                    Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
                                    Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
                                    Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
                                    // wait for result of all Monos
                                    return Mono.zip(monoA, monoB, monoC);
                                })
                                .flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
                                    Data_A dataA = data.getT1();
                                    Data_B dataB = data.getT2();
                                    Data_C dataC = data.getT3();

                                    // create result from masterdataA, masterdataB, dataA, dataB, dataC
                                    ProcessingResult result = ...;
                                    return Mono.just(result);
                                })
                                .subscribe(processingResult -> {
                                    // store result to db/filesystem
                                });
                    });
        }
    }

Solution

  • Question 1

    Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
    Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();
    
        // suppose that  getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
        // then the result will be [(A,1), (A,2), (A,3),,,]
        // masterdataAFlux and masterdataRecordBFlux will execute in parallel
    Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
    

    Question 2

        Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
             // yes that will work just fine for not overwhelming the web services
              // 500 is random value you need to test and tune the optimal value for these services
              .delayElements(Duration.ofMillis(500))
              .flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
                        Mono<Data_A> monoA = webservice.getMonoData_A();
                        Mono<Data_B> monoB = webservice.getMonoData_B();
                        Mono<Data_C> monoC = webservice.getMonoData_C();
                        // wait for result of all Monos
                        return Mono.zip(monoA, monoB, monoC);
                      },
                      // flatmap can take the num of concurrent actions
                      // 5 is random value also u need to test and check the best value for that
                      5)
    

    Question 3

    take a look at this https://github.com/reactor/reactive-streams-commons/issues/21

    complete example

     Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
    Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();
    
        // suppose that  getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
        // then the result will be [(A,1), (A,2), (A,3),,,]
        // masterdataAFlux and masterdataRecordBFlux will execute in parallel
    Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
             // yes that will work just fine for not overwhelming the web services
              // 500 is random value you need to test and tune the optimal value for these services
              .delayElements(Duration.ofMillis(500))
              .flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
                        Mono<Data_A> monoA = webservice.getMonoData_A();
                        Mono<Data_B> monoB = webservice.getMonoData_B();
                        Mono<Data_C> monoC = webservice.getMonoData_C();
                        // wait for result of all Monos
                        return Mono.zip(monoA, monoB, monoC);
                      },
                      // flatmap can take the num of concurrent actions
                      // 5 is random value also u need to test and check the best value for that
                      5)
              .map(data -> {
                // for the mapping u don't need flatmap because it's an expensive operation
                // map is the right choice
                Data_A dataA = data.getT1();
                Data_B dataB = data.getT2();
                Data_C dataC = data.getT3();
    
                // create result from masterdataA, masterdataB, dataA, dataB, dataC
                ProcessingResult result = ???;
                return result;
              })
              // it's always better to save in batch
                // 100 is a random value u should put a value that most suitable for your datasource
              .bufferTimeout(100, Duration.ofMillis(100))
              .concatMap(processingResults -> {
                return batchSave(processingResults)
                        // because batchSave is blocking op
                        .subscribeOn(Schedulers.boundedElastic());
              })
              .subscribe();