Using Akka-IO TCP, the procedure to establish connection in an actor is as follow:
class MyActor(remote: InetSocketAddress) extends Actor {
IO(Tcp) ! Connect(remote) //this is the first step, remote is the address to connect to
def receive = {
case CommandFailed(_: Connect) => context stop self // failed to connect
case Connected(remote, local) =>
val connection = sender()
connection ! Register(self)
// do cool things...
}
}
You send a Connect
message to IO(Tcp)
and expect to receive either a CommandFailed
or a Connected
message.
Now, my goal is to create an actor that wraps a TCP connection, but I want my actor to start accepting messages only once the connection is established - otherwise, while waiting for the Connected
message it would start accepting queries but would have no one to send them to.
What I tried:
class MyActor(address: InetSocketAddress) extends Actor {
def receive = {
case Initialize =>
IO(Tcp) ! Connect(address)
context.become(waitForConnection(sender()))
case other => sender ! Status.Failure(new Exception(s"Connection to $address not established yet."))
}
private def waitForConnection(initializer: ActorRef): Receive = {
case Connected(_, _) =>
val connection = sender()
connection ! Register(self)
initializer ! Status.Success(Unit)
// do cool things
case CommandFailed(_: Connect) =>
initializer ! Status.Failure(new Exception("Failed to connect to " + host))
context stop self
}
}
My first receive
is expecting a made-up Initialize
message that will trigger the whole connection process, once done the sender of Initialize
receives a success message and knows that it can know start sending queries.
I'm not very happy with it, it forces me to create my actor with
val actor = system.actorOf(MyActor.props(remote))
Await.ready(actor ? Initialize, timeout)
And it will not be very "restart" friendly.
Any idea to guarantee that my actor won't start receiving messages from the mailbox before the Tcp layer replies with a Connected
?
Use the Stash trait to stash messages you cannot handle right now. As each premature message arrives, use stash()
to defer it. Once the connection is open, use unstashAll()
to return those messages to the mailbox for processing. You can then use become()
to switch to the message processing state.