I have the following implementation for class
class User(identifier : Int, actor : ActorRef) extends Serializable {
var userName : String = Random.alphanumeric.take(4 + Random.nextInt(12)).mkString
var msgRate : Int = 0
var followers : MutableList[User] = new MutableList[User]()
var messageQueue = new LinkedBlockingQueue[String](Messages.maxBufferSize)
override def equals(o : Any) = o match {
case that : User => that.userName.equals(this.userName)
case _ => false
}
override def hashCode = identifier.hashCode
def getRecentMessages(n : Int) : List[String] = {
var msgList : List[String] = List.empty[String]
msgList = messageQueue.toArray().toList.asInstanceOf[List[String]]
return msgList
}
def isFollowing(user : User) : Boolean = {
user.getFollowers().contains(this)
}
def isFollowed(user : User) : Boolean = {
followers.contains(user)
}
def getFollowers() : MutableList[User] = {
return followers
}
def addFollower(follower : User) {
followers += follower
}
}
When I run for a smaller set of actors, adding follower does not cause any problem and the code runs fine. However, for a large number of actors the issue arises:
java.lang.StackOverflowError
at akka.actor.SerializedActorRef$.apply(ActorRef.scala:420)
at akka.actor.LocalActorRef.writeReplace(ActorRef.scala:389)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1075)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
Is this a case of using the same User instance within the class in the form of followers? Is there a resolution to this problem?
EDIT: Including more code as per request. Sorry about a very large code base. Let me give some brief explanation about this. I am working on a message simulator similar to twitter where I have to manage messages based on user tagging. The client randomly generates the userbase, sends messages at a rate and sender manages the data.
Initial handshake comprises of sending the entire client information from Interactor(). Server acknowledges each client and then this is scheduled regularly. The issue arises when sending the entire client information that results in stackoverflow exception. In the code the issue arises at the end of Init in Interactor()
Here is the code:
object ClientApp extends App {
val system = ActorSystem("TwitterClientActor", ConfigFactory.load("applicationClient.conf"))
val sActor = system.actorFor("akka.tcp://ServerActor@" + ipAddr + "/user/Server")
val serverVector = Vector.fill(Messages.nServers)(sActor)
val serverActor = system.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = serverVector)), "serverRouter")
val interActor = system.actorOf(Props(new Interactor()))
var nRequests : Int = 0
val startTime = java.lang.System.currentTimeMillis()
interActor ! Init
}
// Intermediate manager system
class Interactor() extends Actor {
// Member definitions
import actorSys.dispatcher
// User list
for (i <- 0 to clientList.length - 1)
clientList(i) = new User(i, context.actorOf(Props(new Client(i : Int))))
readFollowersStats(clientList.length)
def receive = {
// Send request to users
case Init =>
for (curUser <- clientList)
serverActor ! RegisterClients(curUser)
// ISSUE IMMEDIATELY AFTER THIS
// Schedule after request
case ScheduleClient(identifier) =>
if (!limitReached) {
val curUser = clientList(identifier)
val cancellable = actorSys.scheduler.schedule(0.milliseconds, curUser.getMsgRate.milliseconds)(sendMsg(curUser))
cancelMap += (curUser -> cancellable)
}
case ClientCompleted =>
nCompleted += 1
if (nCompleted == clientList.length) {
serverActor ! Broadcast(PoisonPill)
context.system.shutdown()
}
}
def sendMsg(curUser : User) = {
nMessages.incrementAndGet()
if (nMessages.get() == Messages.msgLimit) {
println("Limit reached!")
limitReached = true
for (cancellable <- cancelMap.values)
cancellable.cancel()
}
else if (nMessages.get() < Messages.msgLimit) {
println(nMessages)
val curSec = java.lang.System.currentTimeMillis()
val curTime = ((curSec - ClientApp.startTime).toDouble) / 1000
if (curTime >= Messages.peakStart && curTime < Messages.peakEnd) {
for (i <- 0 to Messages.peakScale) {
var rndTweet = randomTweet(curUser)
curUser.getReference() ! Tweet(rndTweet)
}
nMessages.addAndGet(Messages.peakScale - 1)
}
else {
var rndTweet = randomTweet(curUser)
//println(curUser + " ---> " + rndTweet)
curUser.getReference() ! Tweet(rndTweet)
}
}
}
def randomTweet(curUser : User) : String = {
// Return some random string
}
def readFollowersStats(usersCount : Int) {
// Read the file stats of the format, min-max percentage
while(file is not empty)
FollowersGeneration(usersCount, minFollowers.toInt, maxFollowers.toInt, percentage.toDouble)
}
}
def FollowersGeneration(usersCount : Int, minFollowers : Int, maxFollowers : Int, followersPercentage : Double) {
var noOfFollowers : Int = 0
var users : Double = (followersPercentage / 100) * usersCount
var temp : Int = users.toInt
for (i <- 0 until temp) {
if (minFollowers < usersCount) {
// Random follower assignment...
// CODE ACCESSES FOLLOWERS HERE!!!
if (!user.isFollowed(clientList(id)))
user.addFollower(clientList(id))
}
}
}
}
class Client(identifier : Int) extends Actor {
var serverActor = ClientApp.serverActor
def receive = {
case "ACK" =>
println("Client " + identifier + " activated")
ClientApp.interActor ! ScheduleClient(identifier)
case Tweet(tweet) =>
serverActor ! Tweet(tweet)
// Other functions
}
}
EDIT2 : Explanation about the use case
This is a Client-Server model. Client: Initiator object creates a Broker class. The broker class creates a list of user actors (Class User in this case) and establishes a relationship between different users, i.e. assigning random followers, rate and other properties. Now, the entire list of users are sent to the server where the server activates the individual clients to start messaging. The client now sends random messages to server for which the server processes them.
The initial approach consisted of using a class User as above and then store the actorRef as a member in the class and send it to user. This was the issue and I changed the User list as an actor class. I have to generate followers and send it to server. I communicated the broker class to add the followers using messages. Now, issue arises from a programmatic perspective where at the server side, I have to access the followers of the User actor. I can either send message requesting the user '!' or use '?' to get the futures. This is the issue where this will slow down the processing capabilities of server. Are there more elegant approaches where I can access the member of the actorRef or much better solution where I can call a function?
The above code is an example of bad practice to wrap an actor inside a class. Ensure that the actor model is utilized instead of approaching it from an object oriented perspective.
In the above scenario, the structure to be defined is:
Broker -> UserActor -> User
Broker has a map and this can be used to resolve any dependencies with it. The model ensures that the actor model is separated from the object logic. Reference
Another alternative is to utilize Futures. This link is a great place to start.