Search code examples
javascalaexceptionakkaactor

Akka recover not working? Still throwing exceptions even though everything executing okay


I have an actor that looks like this. When I await.result on the exception I get the IllegalState exception despite the fact that I'm using recoverWith. I can see the system print ln working so I know this is composed correctly. Also this is harder in java than it is in scala!

What's up with that? I thought it was supposed to no longer return an exception.

public void onReceive(final Object message) throws Exception {
        if (message instanceof MyMessage) {
            final String key = ((messages.MyMessage) message).getKey();

            F.Promise<T> promise = asyncService.get(key);

            promise.wrapped().recoverWith(new Recover<Future<T>>() {
                @Override
                public Future<T> recover(Throwable failure) throws Throwable {
                    if (failure instanceof IllegalStateException) {
                        Future future = Patterns.ask(serviceActor, key, timeout);
                        future.onSuccess(new OnSuccess() {
                            @Override
                            public void onSuccess(Object result) throws Throwable {
                                System.out.println("Here");
                            }
                        }, context().dispatcher());
                        return future;
                    } else {
                        throw failure; //there is actually an issue.
                    }
                }
            }, context().system().dispatcher()); //also tried context().dispatcher()

            Patterns.pipe(promise.wrapped(), context().dispatcher()).to(sender());
        } else {
            log.warning("Unexpected message type - Cache actor ignoring message: " + message.toString());
            unhandled(message);
        }
    }

If I make a sender var in the original context and replace the System.out.println with a reply to the sender it works.


Solution

  • I have not worked with Akka in Java before, but i think your problem is in this line:

    promise.wrapped().recoverWith(new Recover<Future<T>>() {
    

    You are getting the scala future there and you are calling recoverWith. But this does not change the original underlying future of the promise! It instead creates a new future. Therefore i would recommend to change the code like this. You are assigning this new future to a variable and pipe this new one to your sender:

    Future<T> recovered = promise.wrapped().recoverWith(new Recover<Future<T>>() {
    ...
    Patterns.pipe(recovered, context().dispatcher()).to(sender());