Search code examples
scalaactor

Akka actor get remaining message list


I have an actor computing something intensive, and only the last result ideally should count.

I would like that if it receive multiple messages of the same type A(data), only the last one is handled and the previous ones are discarded.

How can I achieve that?


Solution

  • Custom mailbox

    You can try implement some custom mailbox, containing 0 or 1 message:

    import akka.actor.{ActorRef, ActorSystem}
    import akka.dispatch._
    import com.typesafe.config.Config
    
    class SingleMessageQueue extends MessageQueue {
      var message = Option.empty[Envelope]
      def enqueue(receiver: ActorRef, handle: Envelope) = message = Some(handle)
      def dequeue() = {
        val handle = message.orNull
        message = None
        handle
      }
      def numberOfMessages = message.size
      def hasMessages = message.nonEmpty
      def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = message.foreach(deadLetters.enqueue(owner, _))
    }
    
    final case class SingleMessageMailbox() extends MailboxType with ProducesMessageQueue[SingleMessageQueue] {
    
      def this(settings: ActorSystem.Settings, config: Config) = this()
    
      override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new SingleMessageQueue
    }
    

    and next enable it for your actor as descrived in the mailbox section of the docs

    Split actors

    You can introduce pair of actors.

    • Manager, receiving a job, resending it to Worker whenever it's not working right now
    • Worker doing actual work and notifing it's manager when it's done

    example:

    import akka.actor.{Actor, ActorRef, Props}
    
    object Worker {
      case class Job()
      case object JobDone
    }
    
    import Worker.{Job, JobDone}
    
    class Worker extends Actor {
      override def receive = {
        case Job() ⇒
          // your long job
          context.parent ! JobDone
      }
    }
    
    class Manager extends Actor {
      var nextJob = Option.empty[(Job, ActorRef)]
      val worker = context.actorOf(Props[Worker])
    
      def working: Receive = {
        case job: Job ⇒ nextJob = Some((job, sender))
        case JobDone ⇒
          nextJob match {
            case Some((job, snd)) ⇒ worker.tell(job, snd)
            case None ⇒ context.become(free)
          }
      }
    
      def free: Receive = {
        case job: Job ⇒
          worker.tell(job, sender)
          context.become(working)
      }
    
      override def receive = free
    }