Search code examples
akkaactorforkjoinpool

Actors, ForkJoinPool, and ordering of messages


I need help understanding how an Actor system can use ForkJoinPool and maintain ordering guarantees.

I have been playing with Actr https://github.com/zakgof/actr which is a simple small actor system. I think my question applies to Akka as well. I have a simple bit of code that sends one Actor numbers 1 to 10. The Actor just prints the messages; and the messages are not in order. I get 1,2,4,3,5,6,8,7,9,10.

I think this has to do with the ForkJoinPool. Actr wraps a message into a Runnable and sends it to the ForkJoin Executor. When the task executes it puts the message onto the destination Actor's queue and processes it. My understanding of ForkJoinPool is that tasks are distributed to multiple threads. I've added logging and the messages 1,2,3,... are being distributed to different threads and the messages are put on to the Actor's queue out of order.

Am I missing something? Actr's Scheduler is similar to Akka's Disapatcher and it can be found here: https://github.com/zakgof/actr/blob/master/src/main/java/com/zakgof/actr/impl/ExecutorBasedScheduler.java

The ExecutorBasedScheduler is constructed with a ForkJoinPool.commonPool like so:

public static IActorScheduler newForkJoinPoolScheduler(int throughput) {
    return new ExecutorBasedScheduler(ForkJoinPool.commonPool(), throughput);
}

How can an Actor use ForkJoinPool and keep messages in order?


Solution

  • I can't speak to Actr at all, but in Akka the individual messages are not created as ForkJoinPool tasks. (One task per message seems like a very bad approach for many reasons, not just ordering issues. Namely that messages can typically be processed very quickly and if you had one task per message the overhead would be awfully high. You want to have some batching, at least under load, so that you get better thread locality and less overhead.)

    Essentially, in Akka, the actor mailboxes are queues within an object. When a message is received by the mailbox it will check if it has already scheduled a task, if not, it will add a new task to the ForkJoinPool. So the ForkJoinPool task isn't "process this message", but instead "process the Runnable associated with this specific Actor's mailbox". Some period of time then obviously passes before the task gets scheduled and the Runnable runs. When the Runnable runs, the mailbox may have received many more messages. But they will just have been added to the queue and the Runnable will then just process as many of them as it is configured to do, in the order in which they were received.

    This is why, in Akka, you can guarantee the order of messages within a mailbox, but cannot guarantee the order of messages sent to different Actors. If I send message A to Actor Alpha, then message B to Actor Beta, then message C to Actor Alpha, I can guarantee that A will be before C. But B might happen before, after, or at the same time as A and C. (Because A and C will be handled by the same task, but B will be a different task.)

    Messaging Ordering Docs : More details on what is guaranteed and what isn't regarding ordering.

    Dispatcher Docs : Dispatchers are the connection between Actors and the actual execution. ForkJoinPool is only one implementation (although a very common one).

    EDIT: Just thought I'd add some links to the Akka source to illustrate. Note that these are all internal APIs. tell is how you use it, this is all behind the scenes. (I'm using permalinks so that my links don't bitrot, but be aware that Akka may have changed in the version you are using.)

    The key bits are in akka.dispatch.Dispatcher.scala

    Your tell will go through some hoops to get to the right mailbox. But eventually:

    • dispatch method gets called to enqueue it. This is very simple, just enqueue and call the registerForExecution method
    • registerForExecution This method actually checks to see if scheduling is needed first. If it needs scheduling it uses the executorService to schedule it. Note that the executorService is abstract, but execute is called on that service providing the mailbox as an argument.
    • execute If we assume the implementation is ForkJoinPool, this is the executorService execute method we end up in. Essentially we just create a ForkJoinTask with the supplied argument (the mailbox) as the runnable.
    • run The Mailbox is conveniently a Runnable so the ForkJoinPool will eventually call this method once scheduled. You can see that it processes special system messages then calls processMailbox then (in a finally) calls registerForExecution again. Note that registerForExecution checks if it needs scheduling first so this isn't an infinite loop, it's just checking if there are is remaining work to do. While we are in the Mailbox class you can also look at some of the methods that we used in the Dispatcher to see if scheduling is needed, to actually add messages to the queue,etc.
    • processMailbox Is essentially just a loop over calling actor.invoke except that it has to do lots of checking to see if it has system messages, if it's out of work, if it's passed a threshold, if it has been interrupted, etc.
    • invoke is where the code you write (the receiveMessage) actually gets called.

    If you actually click through all of those links you'll see that I'm simplifying a lot. There's lots of error handling and code to make sure everything is thread safe, super efficient, and bulletproof. But that's the gist of the code flow.