I have an actor that looks like this. When I await.result on the exception I get the IllegalState exception despite the fact that I'm using recoverWith. I can see the system print ln working so I know this is composed correctly. Also this is harder in java than it is in scala!
What's up with that? I thought it was supposed to no longer return an exception.
public void onReceive(final Object message) throws Exception {
if (message instanceof MyMessage) {
final String key = ((messages.MyMessage) message).getKey();
F.Promise<T> promise = asyncService.get(key);
promise.wrapped().recoverWith(new Recover<Future<T>>() {
@Override
public Future<T> recover(Throwable failure) throws Throwable {
if (failure instanceof IllegalStateException) {
Future future = Patterns.ask(serviceActor, key, timeout);
future.onSuccess(new OnSuccess() {
@Override
public void onSuccess(Object result) throws Throwable {
System.out.println("Here");
}
}, context().dispatcher());
return future;
} else {
throw failure; //there is actually an issue.
}
}
}, context().system().dispatcher()); //also tried context().dispatcher()
Patterns.pipe(promise.wrapped(), context().dispatcher()).to(sender());
} else {
log.warning("Unexpected message type - Cache actor ignoring message: " + message.toString());
unhandled(message);
}
}
If I make a sender var in the original context and replace the System.out.println with a reply to the sender it works.
I have not worked with Akka in Java before, but i think your problem is in this line:
promise.wrapped().recoverWith(new Recover<Future<T>>() {
You are getting the scala future there and you are calling recoverWith
. But this does not change the original underlying future of the promise! It instead creates a new future. Therefore i would recommend to change the code like this. You are assigning this new future to a variable and pipe this new one to your sender:
Future<T> recovered = promise.wrapped().recoverWith(new Recover<Future<T>>() {
...
Patterns.pipe(recovered, context().dispatcher()).to(sender());