I would like to throw an exception as the following:
Source.empty
.map {
throw new RuntimeException("Failed")
}
.runWith(Sink.foreach(println))
.onComplete {
case Success(_) =>
println()
case Failure(e) =>
println(s"Thrown ${e.getMessage}")
}
But the exception does not appears in the onComplete
method. It prints
Exception in thread "main" java.lang.RuntimeException: Failed
at com.sweetsoft.App$.main(App.scala:30)
at com.sweetsoft.App.main(App.scala)
How to throw an exception, that will stop the stream and appears at the end?
Akka has build in Error handling : Akka Supervision Strategies
val testSupervisionDecider: Supervision.Decider = {
case ex: java.lang.RuntimeException =>
println(s"some run time exception ${ex.getMessage}")
Supervision.Stop
case ex: Exception =>
println("Exception occurred and stopping stream",
ex)
Supervision.Stop
}
and you can use the supervision decider as
val list = List.range(1, 100)
Source(list).map { item =>
if ((item % 2) == 0) {
throw new RuntimeException(s"$item")
} else {
item
}
}.withAttributes(ActorAttributes.supervisionStrategy(testSupervisionDecider))
.runWith(Sink.foreach(println)).onComplete {
case Success(_) =>
println()
case Failure(e) =>
println(s"Thrown ${e.getMessage}")
}