Search code examples
scalarabbitmqakkaactor

OutOfMemoryError using Akka Actors


I have an application that consume messages from RabbitMQ and i'm using Actors to handle the work.

Here is my approach:

object QueueConsumer extends Queue {

  def consumeMessages = {
    setupListener(buildChannel(resultsQueueName), resultsQueueName,
        resultsCallback)
  }

  private def setupListener(receivingChannel: Channel, queue: String, 
        f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
      Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
  }

}

class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume(queue, false, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Props(new Actor {
    def receive = {
      case some: String => f(some)
    }
      })) ! msg
      channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
    }
  }

}

After some seconds running, it throws a java.lang.OutOfMemoryError: GC overhead limit exceeded.

I think that it's happening because i'm starting a new Actor for every message that i receive - so if i have 100000 messages, it'll create 100000 actors. Is it a good approach or should i implement something like an 'actors pool'?

Anyone have an idea how can i avoid OutOfMemoryError in my scenario?

Thank in advance.

edit1:

changed approach to:

class Queue2(json:String) extends Actor {

  def receive = {
    case x: String =>
      val envelope = MessageEnvelopeParser.toObject(x)
      val processor = ProcessQueueServiceFactory.getProcessResultsService()
      envelope.messages.foreach(message => processor.process(message))
  }

}

object Queue2 {
  def props(json: String): Props = Props(new Queue2(json))
}

class QueueActor(channel:Channel, queue:String) extends Actor {

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new QueueingConsumer(channel)
    channel.basicConsume(queue, false, consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Queue2.props(msg))
      channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
    }
  }
}

Solution

  • Your per-message actors will need to stop themselves when finished, otherwise they stay around forever. Please see the docs on Actor lifecycle and stopping Actors. Here you’ll just need to add context.stop(self) after the processing is finished.