Search code examples
scalaakkaagent

Concurrent Akka Agents in Scala


I'm working on a scala project right now, and I've decided to use Akka's agent library over the actor model, because it allows a more functional approach to concurrency.However, I'm having a problem running many different agents at a time. It seems like I'm capping at only having three or four agents running at once.

import akka.actor._
import akka.agent._
import scala.concurrent.ExecutionContext.Implicits.global

object AgentTester extends App {
// Create the system for the actors that power the agents
implicit val system = ActorSystem("ActorSystem")

// Create an agent for each int between 1 and 10
val agents = Vector.tabulate[Agent[Int]](10)(x=>Agent[Int](1+x))

// Define a function for each agent to execute
def printRecur(a: Agent[Int])(x: Int): Int = {
    // Print out the stored number and sleep.
    println(x)
    Thread.sleep(250)

    // Recur the agent
    a sendOff printRecur(a) _

    // Keep the agent's value the same
    x
}

// Start each agent
for(a <- agents) {
    Thread.sleep(10)
    a sendOff printRecur(a) _
}
}

The above code creates an agent holding each integer between 1 and 10. The loop at the bottom sends the printRecur function to every agent. The output of the program should show the numbers 1 through 10 being printed out every quarter of a second (although not in any order). However, for some reason my output only shows the numbers 1 through 4 being outputted.

Is there a more canonical way to use agents in Akka that will work? I come from a clojure background and have used this pattern successfully there before, so I naively used the same pattern in Scala.


Solution

  • My guess is that you are running on a 4 core box and that is part of the reason why you only ever see the numbers 1-4. The big thing at play here is that you are using the default execution context which I'm guessing on your system uses a thread pool with only 4 threads on it (one for each core). With the way you've coded this in this sort of recursive manner, my guess is that the first 4 agents never relinquish the threads and they are the only ones that will ever print anything.

    You can easily fix this by removing this line:

    import scala.concurrent.ExecutionContext.Implicits.global
    

    And adding this line after you create the ActorSystem

    import system.dispatcher
    

    This will use the default dispatcher for the actor system which is a fork join dispatcher which does not seem to have the same issue as the default execution context you imported in your sample.

    You could also consider using send as opposed to sendOff as that will use the execution context that was available when you constructed the agent. I would think one would use sendOff when they had a case where they explicitly wanted to use another execution context.