Search code examples
javaquarkusmutinysmallrye

How to invoke asynch Operations after synch Operations with Quarkus Smallrye Mutiny?


I need to run some synchronous code followed by asynchronous code in Quarkus. The documentation sounds promising https://smallrye.io/smallrye-mutiny/2.2.0/guides/imperative-to-reactive/, but it seems to be dated since Uni.createFrom() doesn't return a Uni but an UniCreate object and some method names seem to have changed since the recording of the Quarkus Insights talk.

I've tried several variations without success. It sounds like the method 'transform' could help, but I haven't found samples how to use it in this context. Other options return "thread must not be blocked".

I'm looking for something like this:

public Multi<MyReponseMulti> queryAsStream(String query) {
    return Uni.createFrom().
        item(() -> {
            MyReponseUni myResponseUni = this.invokeRemoteServiceUsingBlockingIO(query);
            return this.invokeRemoteServiceUsingReactive(myResponseUni);
        });
        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
        .subscribe().asCompletionStage();
}

Right now the code above has a compilation error:

Type mismatch: cannot convert from CompletableFuture<Multi<MyReponseMulti>> to Multi<MyReponseMulti>

Could someone please help? Thanks!


Solution

  • I believe you need something like this:

    public Multi<MyReponseMulti> queryAsStream(String query) {
        // First, create a Uni for the blocking operation
        var blockingOp = Uni.createFrom().item(() -> {
          return this.invokeRemoteServiceUsingBlockingIO(query);
        }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
        // Now, transform the outcome of this uni to the Multi 
        return blockingOp
           .onItem().transformToMulti(myResponseUni -> {
                // I guess this method returns a Multi
                return this.invokeRemoteServiceUsingReactive(myResponseUni);
            });
    }