Search code examples
scalaakkaakka-io

Initializing an akka actor using Akka-IO TCP


Using Akka-IO TCP, the procedure to establish connection in an actor is as follow:

class MyActor(remote: InetSocketAddress) extends Actor {

  IO(Tcp) ! Connect(remote)    //this is the first step, remote is the address to connect to

  def receive = {
    case CommandFailed(_: Connect) => context stop self // failed to connect

    case Connected(remote, local) =>
      val connection = sender()
      connection ! Register(self)
      // do cool things...  
  }
}

You send a Connect message to IO(Tcp) and expect to receive either a CommandFailed or a Connected message.

Now, my goal is to create an actor that wraps a TCP connection, but I want my actor to start accepting messages only once the connection is established - otherwise, while waiting for the Connected message it would start accepting queries but would have no one to send them to.

What I tried:

class MyActor(address: InetSocketAddress) extends Actor {

  def receive = {
    case Initialize =>
      IO(Tcp) ! Connect(address)
      context.become(waitForConnection(sender()))

    case other => sender ! Status.Failure(new Exception(s"Connection to $address not established yet."))
  }

  private def waitForConnection(initializer: ActorRef): Receive = {
    case Connected(_, _) =>
      val connection = sender()
      connection ! Register(self)
      initializer ! Status.Success(Unit)
      // do cool things

    case CommandFailed(_: Connect) =>
      initializer ! Status.Failure(new Exception("Failed to connect to " + host))
      context stop self
  }
}

My first receive is expecting a made-up Initialize message that will trigger the whole connection process, once done the sender of Initialize receives a success message and knows that it can know start sending queries.

I'm not very happy with it, it forces me to create my actor with

val actor = system.actorOf(MyActor.props(remote))
Await.ready(actor ? Initialize, timeout)

And it will not be very "restart" friendly.

Any idea to guarantee that my actor won't start receiving messages from the mailbox before the Tcp layer replies with a Connected?


Solution

  • Use the Stash trait to stash messages you cannot handle right now. As each premature message arrives, use stash() to defer it. Once the connection is open, use unstashAll() to return those messages to the mailbox for processing. You can then use become() to switch to the message processing state.