Search code examples
scalaakka-stream

Akka Streams Error Handling. How to know which row failed?


I read this article on akka streams error handling

http://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-error.html

and wrote this code.

val decider: Supervision.Decider = {
  case _: Exception => Supervision.Restart
  case _ => Supervision.Stop
}

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

val source = Source(1 to 10)
 val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")}
 val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x))
 val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s =>
  import GraphDSL.Implicits._
  source ~> flow ~> s.in
  ClosedShape
})
val future = graph.run()
future.onComplete{ _ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

This works very well .... except that I need to scan the output to see which row did not get processed. Is there a way for me to print/log the row which failed? [Without putting explicit try/catch blocks in each and every flow that I write?]

So for example If I was using actors (as opposed to streams) I could have written a life cycle event of an actor and I could have logged when an actor restarted along with the message which was being processed at the time of restart.

but here I am not using actors explicitly (although they are used internally). Are there life cycle events for a Flow / Source / Sink?


Solution

  • Just a small modification to your code:

      val decider: Supervision.Decider = {
      case e: Exception =>
        println("Exception handled, recovering stream:" + e.getMessage)
        Supervision.Restart
      case _ => Supervision.Stop
    }
    

    If you pass meaningful messages to your exceptions in the stream, the line for example, you can print them in the supervision decider.

    I used println to give a quick and short answer, but strongly recommend to use some logging libraries such as scala-logging