Search code examples
playframeworkplayframework-2.0akkaakka-remote-actor

Cross communicate between remote actor systems in distributed play framework application


I'm trying to figure out the best way to architect my application so I can send push notifications from my play framework application in a redundant way.

I want to implement a "rest period" to send a push notification to a mobile device 30 seconds after a user modifies data. If the user makes another modification within that 30 second span, the original notification needs to be canceled and replaced with a new notification that should be sent 30 seconds after the the latest modification, and so on.

The problem is that my API backends need to communicate to each other to ensure they don't send multiple notifications within that 30 second period, since they are load balanced. For example:

  1. User1 makes a modification which is sent to API Server1. A notification is triggered to happen in 30 seconds.
  2. User1 makes a second modification to the same record 5 Seconds later, which ends up being routed to API Server2. Another notification is triggered to be sent in 30 seconds, because it has no knowledge of the information Server1 received.

This is incorrect behavior - User1 should only receive one notification since the modifications occurred without the data being "at rest" for 30 seconds.

Since I'm not particularly familiar with Akka, this seemed like a good learning opportunity. It looks like I can fix this with Akka remoting.

This is the simplest architecture I could think of:

  • Create an akka system ("notifications") in each instance of the API with a router to send messages to each API instance, which each have one Akka actor ("notificationActor")

My application.conf looks like this:

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"

    deployment {
      /router {
        router = round-robin-group
        routees.paths = [
          "akka.tcp://notifications@server1:2552/user/notificationActor",
          "akka.tcp://notifications@server2:2552/user/notificationActor"]
      }
    }
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "server1" // set to server1 or server 2 upon deployment
      port = 2552
    }
  }
}

I'm setting up the system, actors and router like this:

// the system, instantiated once per server
private val system = ActorSystem("notifications")

// the local actor, instantiated once per server
private val notificationActor = system.actorOf(Props[NotificationActor], name = "notificationActor")

// the router, instantiated once per server
private val router = system.actorOf(FromConfig.props(Props[NotificationActor]), name = "router")

When I need a notification to be sent, I tell my actor to schedule it. This is so each system can hold onto the Cancellable instance in a key/value pair, and cancel the notification if the data is updated on a different server:

Client.scala (approximation, may have typos)

def updateData(data: DbData) = {
   // update data in db
   NotificationController.sendMessage(Notification(data.key))
}

NotificationController.scala (approximation, may have typos)

def sendMessage(pushNotification: Notification) = {

   // cancel all notifications related to this data on all servers
   router ! Broadcast(CancelNotification(pushNotification))

   // schedule the new notification on the next available actor
   router ! ScheduleNotification(pushNotification)
}

CancelNotification.scala (approximation, may have typos)

case class CancelNotification(pushNotification: Notification)

ScheduleNotification.scala (approximation, may have typos)

case class ScheduleNotification(pushNotification: Notification)

NotificationActor.scala (approximation, may have typos)

val cancellableMap: Map[String, Cancellable] = // (new concurrent hash map)
def receive: Receive = {
  case ScheduleNotification(pushNotification) => //uses - this.context.system.scheduler.scheduleOnce and stores the key/Cancellable pair in cancellableMap
  case CancelNotification(pushNotification) => //use stored Cancellable to cancel notification, if present
  case Notification(key) => //actually send the push notification
}

This works fine locally, but once I deploy it to my test environment (with multiple machines) every other message seems to be lost. I assume that's because it's attempting to send these messages to Server2, but I don't see any errors in the log file for either application. I've attempted to add more logging to my akka configuration, but I'm not seeing any extra output in logs/application.log (the default play framework log):

akka {

  loglevel = "DEBUG"
  log-config-on-start = on

  actor {
    provider = "akka.remote.RemoteActorRefProvider"

    deployment {
      /router {
        router = round-robin-group
        routees.paths = [
          "akka.tcp://notifications@server1:2552/user/notificationActor",
          "akka.tcp://notifications@server2:2552/user/notificationActor"]
      }
    }

    debug {
      unhandled = on
    }
  }
  remote {
    log-sent-messages = on
    log-received-messages = on
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "server1" // set to server1 or server 2 upon deployment
      port = 2552
    }
  }
}

Why isn't Server2 receiving messages? Is it ok that I'm instantiating an actor system with actors from all servers on every instance? Should they be able to cross communicate?

Also, if I'm overcomplicating this, I'm open to other solutions. This just seemed like the simplest approach if I can get it to work.


Solution

  • I think I figured it out. This architecture appears to work, but I had two issues that I resolved by running it from my local machine and adding server1 and server2 to my config.

    1. I wasn't instantiating my actor system when the application started - I was using lazy vals which meant the actor wasn't ready because it hadn't received a request that would cause it to be created. In other words, the notificationActor instance on Server2 wasn't listening as it wasn't yet initialized.

      To resolve this, I moved the initialization of all akka related components (the system, actor and router) to the onStart method of my GlobalSettings class so it's ready to go as soon as the play application starts. This was the primary issue.

    2. My messages weren't serializable.