Search code examples
scalaakkaakka-stream

Why doesn't the stream continue if the error occurs inside the Source


I have written a scala stream application. My objective is that if during the stream processing an item encounters an error during processing, then the stream ignores it and continues processing the remaining items.

Towards this goal, I wrote this code

object StreamRestart extends App {
   implicit val actorSystem = ActorSystem()
   implicit val ec : ExecutionContext = actorSystem.dispatcher
   var failed: ListBuffer[Int] = ListBuffer()
   val decider : Supervision.Decider = {
      case x : Exception => {
         println(s"failed with error $x")
         failed += x.getMessage.toInt
         Supervision.Restart
      }
   }
   implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
   def source : Source[Int, NotUsed] = {
      val iter  = (1 to 1000 toStream) map {x => if (x == 600) throw new Exception(x.toString) else x}
      Source(iter)
   }

   val sink : Sink[Int, Future[Done]] = Sink.foreach[Int]{i => println(i)}
   val future : Future[Done] = source.runWith(sink)
   val f = future.map{_ =>
      println("completed")
      actorSystem.terminate()
   }
   Await.result(f, Duration.Inf)
   println(s"who failed ${failed.toList}")
}

The above code crashes at 600 even though my decider says "Restart". If I move this exception side a "flow" then the decider works and the stream processes till it reaches 1000. but if the error occurs inside the source function, the application crashes.

Is there a way to make my application foolproof in a way that it always reaches the end. Otherwise how do we recover from errors when it occurs in the source function.


Solution

  • An error in the source is meant to be an unrecoverable failure on that source instance, which means the stream cannot continue using that source but you need to switch to a different source: it makes sense that if you are listening from a websocket and the http server goes down, you will not be able to listen again on that websocket.

    Akka streams offers recoverWithRetries as described in the documentation to move to another source, or more generically to replace a part of the stream with a stream element with the same shape.