Search code examples
scalaunit-testingakka-stream

Validating akka-stream Source in unit test


What is idiomatic way of validating a akka-stream Source in a unit test? I am doing like this:

f.service.fetchData(id).flatMap {
          case Right(source) => {
            // TODO: I need to validate here that source contains "Test value"
          }
          case Left(_) => fail("Wrongly responded with error")
        }

source is basically:

Source.single(ByteString("Test value"))

I tried to execute by connecting Source to a Sink and checking the emitted values, but assertions don't seem to work.


Solution

  • The correct way of testing Sources, Flows and Sinks is by using test probes.

    Given following source the logic of witch you want to test

    val source = Source(Seq("TestValue")).collect {
      case s @ "TestValue" => Right[String, String](s)
      case _               => Left[String, String]("error")
    }
    

    (I know that Left will never trigger here but it's just an example)

    Now, you can define a TestSink that is connected to the given source. The resulting graph is executed in following way

    "assert correct value" in {
      implicit val system: ActorSystem = ???
      val probe = source.toMat(TestSink.probe)(Keep.right).run()
      probe.request(1)
      probe.expectNext(Right("TestValue"))
    }
    
    

    TestSink.probe is a sink that materializes into a TestSubscriber.Probe[T] and it provides controls over the stream. Since it's a sink, it needs to emit demand for an element. It does it via request(1) or request one element. Then it's followed by assert sink.expectNext(Right("TestValue")) that checks that correct value is received.

    There is also a counterpart called TestSource.probe that allows testing of a Sink.

    And by combing both of them, you can test a Flow.