Search code examples
scalascalatestakka-stream

Why does the Akka Streams TestSource not receive every pull as a request?


I am trying to track down a bug in a custom Flow where there were extra requests going upstream. I wrote a test case which should have failed given the behaviour and logging I was seeing but it didn't.

I then made a trivial Flow with an obvious extra pull and yet the test still did not fail.

Here is an example:

class EagerFlow extends GraphStage[FlowShape[String, String]] {

  val in: Inlet[String] = Inlet("EagerFlow.in")
  val out: Outlet[String] = Outlet("EagerFlow.out")

  override def shape: FlowShape[String, String] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = {
        println("EagerFlow.in.onPush")
        println("EagerFlow.out.push")
        push(out, grab(in))
        println("EagerFlow.in.pull")
        pull(in)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = {
        println("EagerFlow.out.onPull")
        println("EagerFlow.in.pull")
        pull(in)
      }
    })
  }
}

Here is a test that should pass, i.e. confirm that there is an extra request.

"Eager Flow" should "pull on every push" in {
  val sourceProbe = TestSource.probe[String]
  val sinkProbe = TestSink.probe[String]

  val eagerFlow = Flow.fromGraph(new EagerFlow)

  val (source, sink) = sourceProbe.
    via(eagerFlow).
    toMat(sinkProbe)(Keep.both).
    run

  sink.request(1)
  source.expectRequest()
  source.expectNoMsg()

  source.sendNext("hello")
  sink.expectNext()
  source.expectRequest()
}

In standard output you can see:

0C3E08A8 PULL DownstreamBoundary -> TestSpec$EagerFlow@1d154180 (TestSpec$EagerFlow$$anon$4$$anon$6@7e62ecbd) [TestSpec$EagerFlow$$anon$4@36904dd4]
EagerFlow.out.onPull
EagerFlow.in.pull
0C3E08A8 PULL TestSpec$EagerFlow@1d154180 -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]
EagerFlow.in.onPush
EagerFlow.out.push
EagerFlow.in.pull
0C3E08A8 PULL TestSpec$EagerFlow@1d154180 -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]

assertion failed: timeout (3 seconds) during expectMsg:
  java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
  at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:368)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:737)
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:665)
at akka.stream.testkit.TestPublisher$Probe.expectRequest(StreamTestKit.scala:172)

(This is with the debug logging in akka.stream.impl.fusing.GraphInterpreter#processEvent, well actually using print-points but effectively the same thing)

Why does this pull stop in the UpstreamBoundary and never make it back to the TestSource.probe?

I have quite a few tests which use expectNoMsg() to ensure correct backpressure but it seems these will just give false positives. How should I be testing this if this doesn't work?


Solution

  • To make your test applicable you'll need to set up your Flow buffer to be of size 1.

    val settings = ActorMaterializerSettings(system)
      .withInputBuffer(initialSize = 1, maxSize = 1)
    implicit val mat = ActorMaterializer(settings)
    

    The default sizes are:

      # Initial size of buffers used in stream elements
      initial-input-buffer-size = 4
      # Maximum size of buffers used in stream elements
      max-input-buffer-size = 16
    

    This means that Akka will eagerly request and ingest up to 16 elements anyway at your first request. More info on internal buffers can be found here.