Search code examples
scalaakkaakka-actor

Sending Response outside akka Actor System


I have a play(2.4.2 which has akka 2.4.18) application in which I am using akka actors for uploading a file. I have a parent supervisor Actor with this kind of hierarchy

UploadSupervisor ---child---> UploadActor ---child--->
DataWriteActor & MetaWriteActor

The leaf actors MetaWriteActor & DataWriteActor does the actual writing. A very simplified version of my code is below:

First I have a actor supervisor:

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor") 
 override def supervisorStrategy = OneForOneStrategy() {
    case _: Throwable => Restart
 }

override def receive: Receive = {
  case data: Data => uploadActor ! data
  case meta: MetaInfo => uploadActor ! meta
  //How do I send response outside of actor system?
  case dataSuccess: DataUploadResponse => ??? //Line 10
  case metaSuccess: MetaUploadResponse => ??? //Line 11

}

object UploadSupervisor {
  val uploadSupervisor = Akka.system
    .actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)

case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)

UploadActor:-

class UploadActor extends Actor {  
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")  
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")

override def receive = {   
case data: Data => dataWriteActor ! data   
case meta: MetaInfo => metaWriteActor ! meta   
case dataResp: DataUploadResponse => context.parent ! dataResp   
case metaResp: MetaUploadResponse => context.parent ! metaResp 

 }
}

DataWriteActor :

class DataWriteActor extends Actor {
  case data: Data => //Do the writing 
                     println("data write completed")
                     sender() ! DataUploadResponse("someLocation")  

}

MetaWriteActor

class MetaWriteActor extends Actor {
  case meta: MetaInfo=> //Do the writing 
                     println(" meta info writing completed")
                     sender() ! MetaUploadResponse("someOtherLocation")  

}

Somewhere outside Actor system:-

implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])

val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])

//Do something with futures

The question is how to send the response outside the actor system? Because in Line 10 & 11, I can't use sender ! msg because the current sender is the UploadActor.


Solution

  • You could keep in UploadSupervisor references to the initial senders:

    class UploadSupervisor extends Actor {
      val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
    
      override val supervisorStrategy = OneForOneStrategy() {
        case _ => Restart
      }
    
      var dataSender: Option[ActorRef] = None
      var metaSender: Option[ActorRef] = None
    
      def receive = {
        case data: Data =>
          val s = sender
          dataSender = Option(s)
          uploadActor ! data
        case meta: MetaInfo =>
          val s = sender
          metaSender = Option(s)
          uploadActor ! meta
        case dataSuccess: DataUploadResponse =>
          dataSender.foreach(_ ! dataSuccess)
        case metaSuccess: MetaUploadResponse =>
          metaSender.foreach(_ ! metaSuccess)
      }
    }
    

    To send messages to UploadSupervisor:

    implicit val timeout = Timeout(10 seconds)
    
    val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse]
    
    val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse]
    

    The above assumes that you're sending one Data message and one MetaInfo message to UploadSupervisor at a time. This approach will break down if you send multiple Data and MetaInfo messages and expect concurrent replies. A more general solution is to include the reference to the initial sender in additional case classes that wrap your existing case classes, passing this reference through your actor hierarchy:

    case class DataMsg(data: Data, target: ActorRef)
    case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef)
    
    case class DataUploadMsg(response: DataUploadResponse, target: ActorRef)
    case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef)
    
    class UploadSupervisor extends Actor {
      val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
    
      override val supervisorStrategy = OneForOneStrategy() {
        case _ => Restart
      }
    
      def receive = {
        case data: Data =>
          val s = sender
          uploadActor ! DataMsg(data, s)
        case meta: MetaInfo =>
          val s = sender
          uploadActor ! MetaInfoMsg(meta, s)
        case DataUploadMsg(response, target) =>
          target ! response
        case MetaUploadMsg(response, target) =>
          target ! response
      }
    }
    

    The UploadActor:

    class UploadActor extends Actor {  
      val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor")  
      val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor")
    
      def receive = {   
        case data: DataMsg => dataWriteActor ! data   
        case meta: MetaInfoMsg => metaWriteActor ! meta   
        case dataResp: DataUploadMsg => context.parent ! dataResp   
        case metaResp: MetaUploadMsg => context.parent ! metaResp 
      }
    }
    

    The writers:

    class DataWriteActor extends Actor {
      def receive = {
        case DataMsg(data, target) =>
          // do the writing 
          println("data write completed")
          sender ! DataUploadMsg(DataUploadResponse("someLocation"), target)
      }
    }
    
    class MetaWriteActor extends Actor {
      def receive = {
        case MetaInfoMsg(meta, target) =>
          // do the writing 
          println("meta info writing completed")
          sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target) 
      }
    }