I am working on a reactive quarkus backend service which does the following.
I am using Mutiny to perform the reactive pipelining. Remote service and database integrations are all individually working fine in a non-blocking way. I just need help with writing a pipeline linking those. For eg: Something like below
public Uni<List<Post>> findAllBooks() {
return Book.listAll() // Entity returns Uni<List<Book>> succesfully.
.map(Collection::stream)
.flatMap(book -> postApiClient.getPost(book.getId()) // Reactive postApiClient returns Uni<Post> successfully.
.collect(toList());
I am stuck processing a Uni wrapping a List and then trying to process individual items in that list. Either Uni<List> or Multi would just work fine for me. I just want it to non-blocking throughout.
I have managed to achieve the objective by writing the following pipeline.
I wasn't worried about order hence used transformToUniAndMerge
. For those who needs to maintain order of data as is in the original list use transformToUniAndConcatenate
public Multi<String> hello() {
return Uni.createFrom().item(List.of("hello", "Django", "Hola")) // Creating a list of Uni as would be returned by PanacheEntity
.onItem().transformToMulti(x -> Multi.createFrom().iterable(x)) // Transform into Multi by passing the iterable
.onItem().transformToUniAndMerge(this::mockRemoteService); // Invoke the non-blocking remote service which returns Uni
}
private Uni<String> mockRemoteService(String item) {
return Uni.createFrom().item((item + " mutiny").toUpperCase());
}