I have a restful API that receives an array of JSON messages that will be converted to individual Avro messages and then sent to Kafka. Inside the route, I call 3 different actors: 1) one actor goes out and retrieves the Avro schema from disk 2) then loop through the array of JSON messages and compare it to the Avro schema in a second actor. If any of the messages don't validate, then I need to return a response back to the caller of the API and stop processing. 3) Loop through the array and pass into a 3rd actor that takes the JSON object, converts it to an Avro message and sends to a Kafka topic.
Where I'm having problem getting my head wrapped around is how to stop processing in the route if something fails in one of the actors. I'm passing in the request context to each actor and calling it's complete method but it doesn't seem to immediately stop, the next actor still processes even when it shouldn't. Here is a code snippet of what I'm doing in the route:
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
I've looked through a lot of blogs/books/slides around this topic and not sure what the best approach is. I've been using Scala/Akka for about 2 months and basically self taught on just the pieces that I've been needing. So any insight that the more seasoned Scala/Akka/Spray developers have in this, it's much appreciated. The one thought I had was to wrapper the 3 actors in a 'master' actor and make each a child of that actor and attempt to approach it like that.
As you are using async processing (!
) you cannot control the processing after your messages have been sent. You would need to use ask (?
) that will return a future you can work with.
But I have a better idea. You could send a message from the first actor to the second. And instead of returning a result to the first actor, you could send the message to the third one to keep on the computation.