Background
I have an akka Actor
called Client
that manages http
and https
connections to a server. The client has many features, including a ping
service and a tokenFetcher
service.
The client
represents a 'connection' between one server and another. It is designed to allow a server
to chat to another.
The process of the client
is as such:
ping
the other server
to see if it is onlineserver
is online, do auth
and get a tokenIt is step 3 I am struggling with. I would like to know how I would implement this safely between threads (actors).
What I Have Tried:
I'm using a Seq
of messages that the client would store, like such:
case class SendApiCall(apiCall: ApiCall, route: String, var sent: Boolean = false)
class Client(server: Server) extends Actor {
private var apiCalls: Seq[SendApiCall] = Seq.empty[SendApiCall]
...
override def receive: Receive = {
case sendApiCall@SendApiCall(_, _, _) =>
if (server.onlineStatus == OFFLINE) {
apiCalls = apiCalls.appended(sendApiCall)
}
else {
sendApiCall(sendApiCall)
}
case ServerOnline() => // <- this is send to us from the ping service when it first detects the server is online
apiCalls.iterator.foreach( apiCallRequest =>
if (!apiCallRequest.sent) {
sendApiCall(apiCallRequest)
apiCallRequest.sent = true
}
apiCallRequest
)
apiCalls = apiCalls.filterNot(apiCallRequest => apiCallRequest.sent)
}
}
However, I believe apiCalls
is a mutable
state in this scenario? I would like to know:
Given that your code is running inside an actor, and you seem to be following "the rules", it looks thread-safe to me.
Actors work in terms of a "mailbox", which can receive messages from many sources, possibly concurrently, but will only dispatch the messages to your actor's receive
message sequentially. So while the receive
function may not always run on the same thread, you can effectively consider your actor as single-threaded. As long as you don't do anything to make code inside your receive
function run concurrently with itself, the sequential-ness of the actor/mailbox model makes it safe.
To illustrate something non-thread-safe that would compromise your thread safety, consider a function that you could call to trigger some asynchronous action with a callback; if your callback was able to directly interact with your actor's internal state, that would be non-thread-safe:
def exampleAsyncFunction: Future[Int] = ???
class MyActor extends Actor {
private var myInternalState: Int = 0
override def receive = {
case SomeMessage =>
// BAD!
exampleAsyncFunction.onComplete { i =>
// here, you've captured a reference to this actor, and
// are directly manipulating its internal state, possibly
// from outside the sequential context of the `receive` function
myInternalState += i
}
case SomeOtherMessage =>
// OK
myInternalState += 1
}
}
The above example would not be thread-safe, because the async callback function triggered by SomeMessage
could potentially execute concurrently with the receive logic for SomeOtherMessage
, causing conflicting modifications to myInternalState
.
A potential way to make the above example safe is to have the async callback interact with the actor's mailbox instead of directly with its internal state:
def receive = {
case SomeMessage =>
val me = self // capture `self` before going into async context
exampleAsyncFunction.onComplete { i =>
// BETTER - sending messages to an actor is safe
me ! IncrementCounter(i)
}
case IncrementCounter(i) =>
// safe to modify internal state because it is a direct response
// to a received message, as part of the receive loop
myInternalState += i
...
}
In the example you gave in your post, it doesn't look like you are doing anything to cause concurrent modifications to your internal state, so it is thread-safe.