Search code examples
multithreadingscalathread-safetyakkaakka-http

How to correctly (and safely) manage a Seq of objects in an akka actor?


Background

I have an akka Actor called Client that manages http and https connections to a server. The client has many features, including a ping service and a tokenFetcher service.

The client represents a 'connection' between one server and another. It is designed to allow a server to chat to another.

The process of the client is as such:

  1. Periodically ping the other server to see if it is online
  2. If the other server is online, do auth and get a token
  3. If the token is valid, clear out any calls that have been requested of us

It is step 3 I am struggling with. I would like to know how I would implement this safely between threads (actors).

What I Have Tried:

I'm using a Seq of messages that the client would store, like such:


case class SendApiCall(apiCall: ApiCall, route: String, var sent: Boolean = false)

class Client(server: Server) extends Actor {
    private var apiCalls: Seq[SendApiCall] = Seq.empty[SendApiCall]

    ...

    override def receive: Receive = {
        case sendApiCall@SendApiCall(_, _, _) =>
            if (server.onlineStatus == OFFLINE) {
                apiCalls = apiCalls.appended(sendApiCall)
            }
            else {
                sendApiCall(sendApiCall)
            }
        
        case ServerOnline() => // <- this is send to us from the ping service when it first detects the server is online
            
            apiCalls.iterator.foreach( apiCallRequest =>
                if (!apiCallRequest.sent) {
                    sendApiCall(apiCallRequest)
                    apiCallRequest.sent = true
                }
                apiCallRequest
            )

            apiCalls = apiCalls.filterNot(apiCallRequest => apiCallRequest.sent)
    }
}

However, I believe apiCalls is a mutable state in this scenario? I would like to know:

  1. Is this thread safe?
  2. How would I make it thread safe, if it isn't?

Solution

  • Given that your code is running inside an actor, and you seem to be following "the rules", it looks thread-safe to me.

    Actors work in terms of a "mailbox", which can receive messages from many sources, possibly concurrently, but will only dispatch the messages to your actor's receive message sequentially. So while the receive function may not always run on the same thread, you can effectively consider your actor as single-threaded. As long as you don't do anything to make code inside your receive function run concurrently with itself, the sequential-ness of the actor/mailbox model makes it safe.

    To illustrate something non-thread-safe that would compromise your thread safety, consider a function that you could call to trigger some asynchronous action with a callback; if your callback was able to directly interact with your actor's internal state, that would be non-thread-safe:

    def exampleAsyncFunction: Future[Int] = ???
    
    class MyActor extends Actor {
    
      private var myInternalState: Int = 0
    
      override def receive = {
        case SomeMessage =>
          // BAD!
          exampleAsyncFunction.onComplete { i =>
            // here, you've captured a reference to this actor, and
            // are directly manipulating its internal state, possibly
            // from outside the sequential context of the `receive` function
            myInternalState += i
          }
    
        case SomeOtherMessage =>
          // OK
          myInternalState += 1
      }
    }
    

    The above example would not be thread-safe, because the async callback function triggered by SomeMessage could potentially execute concurrently with the receive logic for SomeOtherMessage, causing conflicting modifications to myInternalState.

    A potential way to make the above example safe is to have the async callback interact with the actor's mailbox instead of directly with its internal state:

    def receive = {
    
      case SomeMessage => 
        val me = self // capture `self` before going into async context
    
        exampleAsyncFunction.onComplete { i =>
          // BETTER - sending messages to an actor is safe
          me ! IncrementCounter(i)
        }
    
      case IncrementCounter(i) =>
        // safe to modify internal state because it is a direct response
        // to a received message, as part of the receive loop
        myInternalState += i
    
      ...
    }
    

    In the example you gave in your post, it doesn't look like you are doing anything to cause concurrent modifications to your internal state, so it is thread-safe.