I have a Uni
Uni<List<String>> list = Uni.createFrom().item(List.of("a", "b", "c"));
I would like to map each item in this list to a new Uni and then process them asynchronously.
I have tried following:
list.map(strings -> strings.stream().map(this::processItem).toList())
.flatMap(unis -> Uni.join().all(unis).andCollectFailures())
.subscribe()
.with(System.out::println);
private Future<String> process(String s) {
return executor.submit(() -> {
System.out.println("start " + s);
Thread.sleep(5000);
System.out.println("end " + s);
return s;
});
}
private Uni<Void> processItem(String item) {
return Uni.createFrom().future(process(item))
.replaceWithVoid();
}
However, this only processes the first element of the list. How can I map a Uni to a List and process each Uni asynchronously?
.subscribe().with(System.out::println);
is asynchronous, if you don't add some check to make sure that the stream is over, the program will exit before the first result is available.
I usually work with Vert.x unit:
@Test
public void test(TestContext context) {
Random random = new Random();
// This is only used to signal the test
// when the stream is terminated
Async async = context.async();
Uni.createFrom()
.item( List.of( "a", "b", "c" ) )
// Convert the Uni<List<String>> into a Multi<String>
.onItem().transformToMulti( Multi.createFrom()::iterable )
.onItem().transformToUniAndMerge( s ->
// You can create a uni the way you prefer
Uni.createFrom().item( s )
// Add some random duration
.onItem().delayIt().by( Duration.ofMillis( (random.nextInt(5) + 1) * 1000 ) )
)
// End the test only when the stream has terminated
.onTermination().invoke( (throwable, aBoolean) -> {
if (throwable != null ) {
context.fail(throwable);
}
else {
async.complete();
}
} )
.subscribe()
.with( s -> System.out.println("Printing: " + s) );
}
a while-loop should work as well.
Or, you could collect the results:
Random random = new Random();
Uni.createFrom()
.item( List.of( "a", "b", "c" ) )
// Convert the Uni<List<String>> into a Multi<String>
.onItem().transformToMulti( Multi.createFrom()::iterable )
.onItem().transformToUniAndMerge( s -> {
final int duration = (random.nextInt( 5 ) + 1) * 1000;
// You can create a uni the way you prefer
return Uni.createFrom().item( s )
// Add some random duration
.onItem().delayIt().by( Duration.ofMillis( duration ) )
.invoke( () -> System.out.println("Letter: " + s + ", duration in ms: " + duration) );
} )
.onItem().invoke( s -> System.out.println("Printing: " + s) )
.collect().asList().await().indefinitely();