I'm trying to use completionTimeout
in an akka streams flow. I've provided a contrived example where the flow takes 10 seconds but I've added a completionTimeout
with a timeout of 1 second. I would expect this flow to timeout after 1 second. However, in the example the flow completes in 10 seconds without any errors.
Why doesn't the flow timeout? Is there a better way to timeout a flow?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class Test extends FlatSpec with Matchers {
implicit val system = ActorSystem("test")
"This Test" should "fail but passes and I don't know why" in {
//This takes 10 seconds to complete
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.map(_ => {"Done!"})
val future: Future[String] =
Source.single("Input")
.via(flow)
.completionTimeout(1 second) // Set a timeout of 1 second
.runWith(Sink.last)
val result = Await.result(future, 15 seconds)
result should be("Done!")
}
}
In executing a given stream, Akka Stream leverages operator fusion to fuse stream operators by a single underlying actor for optimal performance. For your main thread to catch the timeout, you could introduce asynchrony by means of .async
:
val future: Future[String] =
Source.single("Input")
.via(flow)
.async // <--- asynchronous boundary
.completionTimeout(1 second)
.runWith(Sink.last)
future.onComplete(println)
// Processing Input
// Failure(java.util.concurrent.TimeoutException: The stream has not been completed in 1 second.)
An alternative to introduce asynchrony is to use the mapAsync flow stage:
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.mapAsync(1)(_ => Future("Done!")) // <--- asynchronous flow stage
Despite getting the same timeout error, you may notice it'll take ~10s to see result when using mapAsync
, whereas only ~1s using async
. That's because while mapAsync
introduces an asynchronous flow stage, it's not an asynchronous boundary (like what async
does) and is still subject to operator fusion.