I am developing an application that creates some Akka actors to manage and process messages coming from a Kafka topic. Messages with the same key are processed by the same actor. I use the message key also to name the corresponding actor.
When a new message is read from the topic, I don't know if the actor with the id equal to the message key was already created by the actor system or not. Therefore, I try to resolve the actor using its name, and if it does not exist yet, I create it. I need to manage concurrency in regard to actor resolution. So it is possible that more than one client asks the actor system if an actor exists.
The code I am using right now is the following:
private CompletableFuture<ActorRef> getActor(String uuid) {
return system.actorSelection(String.format("/user/%s", uuid))
.resolveOne(Duration.ofMillis(1000))
.toCompletableFuture()
.exceptionally(ex ->
system.actorOf(Props.create(MyActor.class, uuid), uuid))
.exceptionally(ex -> {
try {
return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
The above code is not optimised, and the exception handling can be made better.
However, is there in Akka a more idiomatic way to resolve an actor, or to create it if it does not exist? Am I missing something?
Consider creating an actor that maintains as its state a map of message IDs to ActorRef
s. This "receptionist" actor would handle all requests to obtain a message processing actor. When the receptionist receives a request for an actor (the request would include the message ID), it tries to look up an associated actor in its map: if such an actor is found, it returns the ActorRef
to the sender; otherwise it creates a new processing actor, adds that actor to its map, and returns that actor reference to the sender.