Search code examples
mongodbreactive-programmingspring-data-mongodbproject-reactorspring-data-mongodb-reactive

Managing the lifecycle of multiple Flux with spring-data-mongodb-reactive


I have a data service and I am seriously considering switching to a reactive model. This is a federated query engine that can resolve data for queries by invoking one or more "resolver" implementations, depending on the query type.

If I switch to spring-data-mongodb-reactive, then each of these implementations would have to create a number of Flux instances for:

  1. the queries for different parts of the information
  2. querying all databases for each query from #1

Note: I don't want to combine every Flux because being able to keep the queries for #1 above separate make the final processing much easier. Combining each "part" query for all federated databases would be fine, but I have to keep the data for each "part" separate. I hope that makes sense.

Explaining the full workflow is out of the scope of this post, but I am wondering how I can create any number of Flux instances, and subscribe to them to get them started, but then wait until they all complete before proceeding with the processing of the fully-retrieved data across all federated sources. In Java, I am looking for something that is analogous to a CompletableFuture.allOf().

Am I even close to being on the right track if I do something like this:

public class ReactiveDataService {
    private static final Supplier<Example<String>> example1 = () -> Example.of("Example 1");
    private static final Supplier<Example<String>> example2 = () -> Example.of("Example 2");
    private static final Supplier<Example<String>> example3 = () -> Example.of("Example 3");
    private static final Supplier<Example<String>> example4 = () -> Example.of("Example 4");
    private static final Supplier<Example<String>> example5 = () -> Example.of("Example 5");
    private final Collection<ReactiveMongoRepository<String, String>> repositories;

    public ReactiveDataService(Collection<ReactiveMongoRepository<String, String>> repositories) {
        this.repositories = repositories;
    }

    private void processFluxes(final Flux<String> flux1, final Flux<String> flux2, final Flux<String> flux3,
                               final Flux<String> flux4, final Flux<String> flux5) {
        // Call service to process flux stuff
    }

    /**
     * For all repositories, combine fluxes that run the same query.
     * Subscribe to each flux immediately to get the query started.
     * Add all fluxes to a container flux that processes the results
     * upon completion.
     * After everything is set up, block until completion.
     */
    public void doQuery() {
        final Flux<String> flux1 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example1.get()));
        flux1.subscribe();

        final Flux<String> flux2 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example2.get()));
        flux2.subscribe();

        final Flux<String> flux3 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example3.get()));
        flux3.subscribe();

        final Flux<String> flux4 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example4.get()));
        flux4.subscribe();

        final Flux<String> flux5 = Flux.fromIterable(repositories)
                .flatMap(repo -> repo.findAll(example5.get()));
        flux5.subscribe();

        final Flux<Flux<String>> fluxes = Flux.just(flux1, flux2, flux3, flux4, flux5)
                .doOnComplete(() -> processFluxes(flux1, flux2, flux3, flux4, flux5));
        fluxes.blockLast();
    }
}

Solution

  • Here is an example of how you can do using Mono.zip:

        public static void main(String[] args) {
            Flux<String> flux0 = Flux.empty();
            Flux<String> flux1 = Flux.just("1.1", "1.2", "1.3");
            Flux<String> flux2 = Flux.just("2.1", "2.2", "2.3");
            Flux<String> flux3 = Flux.just("3.1", "3.2", "3.3");
            
            Mono.zip(lists -> process(lists), flux0.collectList(), flux1.collectList(), flux2.collectList(), flux3.collectList()).block();
        }
        
        private static String process(Object[] lists) {
            System.out.println("List 0 is " + lists[0]);
            System.out.println("List 1 is " + lists[1]);
            System.out.println("List 2 is " + lists[2]);
            System.out.println("List 3 is " + lists[3]);
            return "output";
        }
    

    which outputs:

    List 0 is []
    List 1 is [1.1, 1.2, 1.3]
    List 2 is [2.1, 2.2, 2.3]
    List 3 is [3.1, 3.2, 3.3]
    

    So you can adapt it to your situation.

    Note that Mono.zip cannot return null, that's why I put "output" as a result, but if you don't need any output you can put whatever you want which is not null.

    The idea is first to convert every Flux<String> into Mono<List<String>> by using collectList, it will be simpler then to process. Mono.zip allows you to wait for all to be done, and process the output as Object[]. You can convert each object into a List<String> for processing.