Testing Akka Reactive Streams

I'm testing code which streams messages over an outgoing stream TCP connection obtained via:

(IO(StreamTcp) ? StreamTcp.Connect(settings, address))

In my tests, I substitute the resulting Subscriber[ByteString] with a dummy subscriber, trigger some outgoing messages, and assert that have arrived as expected. I use the method below to produce the dummy subscriber and stream result future. (So far, so good)

def testSubscriber[T](settings: FlowMaterializer)(implicit ec: ExecutionContext): (Subscriber[T], Future[Seq[T]]) = {
  var sent = Seq.empty[T]
  val (subscriber, streamComplete) = 
    Duct[T].foreach( bs => sent = sent :+ bs)(settings)
  (subscriber, _ => sent ))

My question is this: is there some canonical method for testing that streams output the expected values, something similar to Akka's TestActorRef? And if not, is there some library function similar to the above function?


  • Testing streams is possible with the akka-streams-testkit.
    Read about it here: