Search code examples
javaspringscalaakkareactor

Spring Boot Controller returning a Mono of a scala.concurrent.Future


I am running an Akka actor system inside on Spring Boot application. I have a set of Actors running.

From my Controller class I call my service class that, using the Actor ask pattern, sends a message to an actor and expects a response. Below is the service method code:

public Mono<Future<SportEventDetailed>> getEventBySportAndLeagueId(Integer sportId, Integer leagueId) {
    final ActorSelection actorSelection = bootstrapAkka.getActorSystem().actorSelection("/user/some/path");
    final ActorMessage message = new ActorMessage()

    final CompletionStage<Future<SportEventDetails>> futureCompletionStage = actorSelection.resolveOne(Duration.ofSeconds(2))
            .thenApplyAsync(actorRef ->
                        Patterns.ask(actorRef, message, 1000)
                        .map(v1 -> (SportEventDetails) v1, ExecutionContext.global())
                )
                .whenCompleteAsync((sportEventDetailsFuture, throwable) -> {
                    // Here sportEventDetailsFuture is of type scala.concurrent.Future
                    sportEventDetailsFuture.onComplete(v1 -> {
                        final SportEventDetails eventDetails = v1.get();
                        log.info("Thread: {} | v1.get - onComplete - SED: {}", Thread.currentThread(), eventDetails);
                        return eventDetails;
                    }, ExecutionContext.global());
                });

    return Mono.fromCompletionStage(futureCompletionStage);
}

While the controller code is as simple as

@GetMapping(path = "{sportId}/{leagueId}")
public Mono<Future<SportEventDetails>> getEventsBySportAndLeagueId(@PathVariable("sportId") Integer sportId, @PathVariable("leagueId") Integer leagueId) {
    return eventService.getEventBySportAndLeagueId(sportId, leagueId);
}

When a client calls this endpoint it either gets {"success":true,"failure":false} or null (as a string).

I suspect the issue for the null response is the scala.concurrent.Future is not completed before the response is sent to the client - but I don't understand why it would not complete on time because I would assume the Mono would wait for the future to complete

The issue here is that Patterns.ask returns a scala.concurrent.Future<SportEventDetails> and I could not find a way to convert the scala Future to a Java CompletableFuture<SportEventDetails> or CompletionStage<SportEventDetails>.

So, my question is: how can I return to the client the json representation of SportEventDetails when using the Akka's Patterns.ask(...) model?


Solution

  • Future, Mono and CompletionStage are three implementations of the same concept, a value that may or may not be here yet. You will need a way to transform them into the same type and then a way to "flatten" the nested type. Mono.fromCompletionStage is such a method as it turns a CompletionStage into a Mono.

    Easiest is will be to avoid getting a Future and the flattening completely:

    In the more recent Java versions (2.5.19 or newer): There are ask overloads taking a java.time.Duration timeout you will get a return value of CompletionStage<SportEventDetail>. There is also ask overloads which takes an ActorSelection so that you do not have to first resolve and then ask when resolve completes:

    CompletionStage<SportEventDetail> futureSportEventDetails = 
      Patterns.ask(selection, message, Duration.ofSeconds(3))
    return Mono.fromCompletionStage(futureSportEventDetails);
    

    In older versions of Akka (2.4.2 and later I think) you should be able to find similar signatures in akka.pattern.PatternsCS.

    If you are on an even older version and cannot upgrade you probably will have to provide your own converter method from Future<T> to CompletionStage<T> or Mono<T> that registers an onComplete listener on the future and completes an instance of the destination type.