I am learning Akka Actor recently. I read the document of dispatchers in Actor. I am curious about the blocking operation in an actor. The last topic in the document describes how to solve the problem. And I am trying to reproduce the example experiment in the document.
Here is my code:
package dispatcher
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Main extends App{
var config = ConfigFactory.parseString(
"""
|my-dispatcher{
|type = Dispatcher
|
|executor = "fork-join-executor"
|
|fork-join-executor{
|fixed-pool-size = 32
|}
|throughput = 1
|}
""".stripMargin)
// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))
val system = ActorSystem("block")
val actor1 = system.actorOf(Props(new BlockingFutureActor()))
val actor2 = system.actorOf(Props(new PrintActor()))
for(i <- 1 to 1000){
actor1 ! i
actor2 ! i
}
}
package dispatcher
import akka.actor.Actor
import scala.concurrent.{ExecutionContext, Future}
class BlockingFutureActor extends Actor{
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
Thread.sleep(5000)
println(s"Blocking future finished ${i}")
}
}
}
package dispatcher
import akka.actor.Actor
class PrintActor extends Actor{
override def receive: Receive = {
case i: Int =>
println(s"PrintActor: ${i}")
}
}
I simply create an ActorSystem
with the default dispatchers and all actors depend on those. The BlockingFutureActor
has a blocking operation that is encapsulated in a Future
. The PrintActor
is merely printing a number instantly.
In the document's explanation, the default dispatchers will be occupied by Future
s in the BlockingFutureActor
, which leads to the message blocking of the PrintActor
. The application gets stuck somewhere like:
> PrintActor: 44
> PrintActor: 45
Unfortunately, my code is not blocked. All outputs from PrintActor
show up smoothly. But outputs from BlockingFutureActor
show up like squeezing toothpaste. I try to monitor my thread info by Intellij's Debug, I got:
You may find only two dispatchers are sleeping(BlockingFutureActor
makes this happen). Others are waiting, which means they are available for new message delivering.
I have read an answer about blocking operation in Actor(page). It is quoted that "Dispatchers are, effectively, thread-pools. Separating the two guarantees that the slow, blocking operations don't starve the other. This approach, in general, is referred to as bulk-heading, because the idea is that if a part of the app fails, the rest remains responsive."
Do default dispatchers spare some dispatcher for blocking operation? Such that the system can handle messages even if there are so many blocking operations asking for dispatchers.
Can the experiment in the Akka document be reproduced? Is there something wrong with my configuration.
Thanks for your suggestions. Best Wishes.
The reason you see all 1000 print statements from the PrintActor
before any print statements from the BlockingFutureActor
is because of the first Thread.sleep
call in the BlockingFutureActor
's receive
block. This Thread.sleep
is the key difference between your code and the example in the official documentation:
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000) // <----- this call is not in the example in the official docs
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
Remember that actors process one message at a time. The Thread.sleep(5000)
basically simulates a message that takes at least five seconds to process. The BlockingFutureActor
won't process another message until it's done processing the current message, even if it has hundreds of messages in its mailbox. While the BlockingFutureActor
is processing that first Int
message of value 1
, the PrintActor
has already finished processing all 1000 messages that were sent to it. To make this more clear, let's add a println
statement:
override def receive: Receive = {
case i: Int =>
println(s"Entering BlockingFutureActor's receive: $i") // <-----
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
A sample output when we run the program:
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...
As you can see, by the time the BlockingFutureActor
actually begins to process the message 2
, the PrintActor
has already churned through all 1000 messages.
If you remove that first Thread.sleep
, then you'll see messages dequeued from the BlockingFutureActor
's mailbox more quickly, because the work is being "delegated" to a Future
. Once the Future
is created, the actor grabs the next message from its mailbox without waiting for the Future
to complete. Below is a sample output without that first Thread.sleep
(it won't be exactly the same every time you run it):
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...