Search code examples
javareactive-programmingmutinyquarkus-reactive

Process items in Uni<List> asynchronously


I have a Uni

Uni<List<String>> list = Uni.createFrom().item(List.of("a", "b", "c"));

I would like to map each item in this list to a new Uni and then process them asynchronously.

I have tried following:

list.map(strings -> strings.stream().map(this::processItem).toList())
                .flatMap(unis -> Uni.join().all(unis).andCollectFailures())
                .subscribe()
                .with(System.out::println);

    private Future<String> process(String s) {
        return executor.submit(() -> {
            System.out.println("start " + s);
            Thread.sleep(5000);
            System.out.println("end " + s);
            return s;
        });
    }

    private Uni<Void> processItem(String item) {
        return Uni.createFrom().future(process(item))
                .replaceWithVoid();
    }

However, this only processes the first element of the list. How can I map a Uni to a List and process each Uni asynchronously?


Solution

  • .subscribe().with(System.out::println); is asynchronous, if you don't add some check to make sure that the stream is over, the program will exit before the first result is available.

    I usually work with Vert.x unit:

        @Test
        public void test(TestContext context) {
            Random random = new Random();
            // This is only used to signal the test
            // when the stream is terminated
            Async async = context.async();
    
            Uni.createFrom()
                    .item( List.of( "a", "b", "c" ) )
                    // Convert the Uni<List<String>> into a Multi<String>
                    .onItem().transformToMulti( Multi.createFrom()::iterable )
                    .onItem().transformToUniAndMerge( s ->
                              // You can create a uni the way you prefer
                              Uni.createFrom().item( s )
                                      // Add some random duration
                                      .onItem().delayIt().by( Duration.ofMillis( (random.nextInt(5) + 1) * 1000 ) )
                    )
                    // End the test only when the stream has terminated
                    .onTermination().invoke( (throwable, aBoolean) -> {
                        if (throwable != null ) {
                            context.fail(throwable);
                        }
                        else {
                            async.complete();
                        }
                    } )
                    .subscribe()
                    .with( s -> System.out.println("Printing: " + s) );
        }
    

    a while-loop should work as well.

    Or, you could collect the results:

            Random random = new Random();
            Uni.createFrom()
                    .item( List.of( "a", "b", "c" ) )
                    // Convert the Uni<List<String>> into a Multi<String>
                    .onItem().transformToMulti( Multi.createFrom()::iterable )
                    .onItem().transformToUniAndMerge( s -> {
                              final int duration = (random.nextInt( 5 ) + 1) * 1000;
                              // You can create a uni the way you prefer
                              return Uni.createFrom().item( s )
                                      // Add some random duration
                                      .onItem().delayIt().by( Duration.ofMillis( duration ) )
                                      .invoke( () -> System.out.println("Letter: " + s + ", duration in ms: " + duration) );
                    } )
                    .onItem().invoke( s -> System.out.println("Printing: " + s) )
                    .collect().asList().await().indefinitely();