I am using akka actors to update data on my web-client. One of those actors is solely repsonsible for sending updates concerning single Agent
s. These agents are updated very rapidly (every 10ms). My goal now is to throttle this updating mechanism so that the newest version of every Agent
is sent every 300ms.
This is what I came up with so far:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}
This actor (BroadcastSingleAgentActor
) works as expected, but I am not 100% sure if this is thread safe (updating the queue
while potentionally clearing it). Also, this does not feel like I am making the best out of the tools akka provides me with. I found this article (Throttling Messages in Akka 2), but my problem is that I need to keep the newest Agent
message while dropping any old version of it. Is there an example somewhere similar to what I need?
No, this isn't thread safe because the scheduling via the ActorSystem
will happen on another thread than the receive
. One potential idea is to do the scheduling within the receive
method because incoming messages to the BroadcastSingleAgentActor
will be handled sequentially.
override def receive: Receive = {
case Refresh =>
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
}
queue = Set()
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}