Search code examples
scalaakkaakka-stream

How to throw an exception in akka stream?


I would like to throw an exception as the following:

  Source.empty
      .map {
        throw new RuntimeException("Failed")
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }  

But the exception does not appears in the onComplete method. It prints

Exception in thread "main" java.lang.RuntimeException: Failed
    at com.sweetsoft.App$.main(App.scala:30)
    at com.sweetsoft.App.main(App.scala) 

How to throw an exception, that will stop the stream and appears at the end?


Solution

  • Akka has build in Error handling : Akka Supervision Strategies

    val testSupervisionDecider: Supervision.Decider = {
            case ex: java.lang.RuntimeException =>
              println(s"some run time exception ${ex.getMessage}")
              Supervision.Stop
            case ex: Exception =>
              println("Exception occurred and stopping stream",
                ex)
              Supervision.Stop
          }
    

    and you can use the supervision decider as

    val list = List.range(1, 100)
    
      Source(list).map { item =>
        if ((item % 2) == 0) {
          throw new RuntimeException(s"$item")
        } else {
          item
        }
      }.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
        .runWith(Sink.foreach(println)).onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }