Search code examples
scalaakkaakka-streamreactive-streams

Testing Akka Reactive Streams


I'm testing code which streams messages over an outgoing stream TCP connection obtained via:

(IO(StreamTcp) ? StreamTcp.Connect(settings, address))
.mapTo[StreamTcp.OutgoingTcpConnection]
.map(_.outputStream)

In my tests, I substitute the resulting Subscriber[ByteString] with a dummy subscriber, trigger some outgoing messages, and assert that have arrived as expected. I use the method below to produce the dummy subscriber and stream result future. (So far, so good)

def testSubscriber[T](settings: FlowMaterializer)(implicit ec: ExecutionContext): (Subscriber[T], Future[Seq[T]]) = {
  var sent = Seq.empty[T]
  val (subscriber, streamComplete) = 
    Duct[T].foreach( bs => sent = sent :+ bs)(settings)
  (subscriber, streamComplete.map( _ => sent ))
}

My question is this: is there some canonical method for testing that streams output the expected values, something similar to Akka's TestActorRef? And if not, is there some library function similar to the above function?


Solution

  • Testing streams is possible with the akka-streams-testkit.
    Read about it here: http://doc.akka.io/docs/akka/current/scala/stream/stream-testkit.html