Search code examples
javareactive-programmingquarkussmallryemutiny

how can one return a value directly from Uni<T> without converting that into a CompletableFuture


to give a context, I have two methods in my class both returns Uni, of which the first method depends on the second method in the following manner

public Uni<String> greeting(String name) {
    log.info("\n\n");
    log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());

    return Uni
            .createFrom()
            .item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
            .emitOn(emitExecutor)
            .onItem()
            .transform(parameter-> {
                log.debug("`(p)>Transform` invoked on Thread {}",Thread.currentThread().getName());
                assert Thread.currentThread().getName().equals(threadName);
                try {
                    return ioSimulation(parameter,Thread.currentThread().getName()).subscribeAsCompletionStage().get();
                } catch (InterruptedException | ExecutionException e) {
                    log.error("failed to execute ioSimulation due to {}",e.getMessage());
                    throw new RuntimeException("failed to communicate with client {}"+e.getMessage());
                }
            }).onFailure()
            .retry() 
    .atMost(2);
}


public Uni<String> ioSimulation(String param,String threadName){
        log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
        assert Thread.currentThread().getName().equals(threadName);
        return MockServer.client
                .getAbs("http://localhost:80")
                .addQueryParam("name",param)
                .send()
                .onItem().transform(response-> {
                    if (response.statusCode() == 200){
                        return response.bodyAsString();
                    }else{
                        throw  new IllegalStateException(response.bodyAsString());
                    }
        });

now in the greeting(String name) (the first one) method to return the String value I had to use subscribeAsCompletionStage().get() otherwise the return type would be Uni<Uni<String>>.

My questions are as follows

  1. is a way where I can return the actual result(exact value which is wrapped inside Uni) of the second method ioSimulation without using subscribeAsCompletionStage().get() if yes then how can I do that?
  2. Is there a better of doing this? like refactor the existing methods.

if anyone is interested the complete code can be found here


Solution

  • Thanks to @Ladicek, there is a out of the box way to process an asynchronous Uni<T> return in transform block, the solution would be

    public Uni<String> greeting(String name) {
        log.info("\n\n");
        log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());
    
        return Uni
                .createFrom()
                .item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
                .emitOn(emitExecutor)
                .onItem()
                .transformToUni(parameter -> ioSimulation(parameter,Thread.currentThread().getName()))
                .onFailure()
                .retry()
        .atMost(2);
    
    }
    
    
    public Uni<String> ioSimulation(String param,String threadName){
        log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
        assert Thread.currentThread().getName().equals(threadName);
        return MockServer.client
                .getAbs("http://localhost:80")
                .addQueryParam("name",param)
                .send()
                .onItem().transform(response-> {
                    if (response.statusCode() == 200){
                        return response.bodyAsString();
                    }else{
                        throw  new IllegalStateException(response.bodyAsString());
                    }
        });
    
    }
    

    transformToUni Transforms the received item asynchronously and returns a new Uni<T>

    Update:

    one more way(the one I was looking for) of doing it is using the callback.

    public Uni<String> callbackGreeting(String name) {
        log.info("\n\n");
        log.info("\t`callbackGreeting(String name)` Executing on Thread {}", Thread.currentThread().getName());
    
        return Uni
                .createFrom()
                .item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
                .emitOn(emitExecutor)
                .onItem()
                .transformToUni(param -> {
                    return Uni.createFrom().emitter(em -> {
                        ioSimulation(param, Thread.currentThread().getName())
                                .subscribe()
                                .with(success -> em.complete(success.bodyAsString()), //putting result of uni ioSimulation into the callback
                                        exception -> {
                                            log.info("the following error occurred before sending to em.complete {}", exception.getMessage());
                                            em.fail(exception);
                                        });
    
                    });
                })
                .onFailure()
                .retry()
                .atMost(2)
                .map(item -> "Operation completed with item " + item);
    
    }
    

    this example is for demonstration purpose only, I understand this is not a optimal solution. The complete code is here