I am trying to track down a bug in a custom Flow where there were extra requests going upstream. I wrote a test case which should have failed given the behaviour and logging I was seeing but it didn't.
I then made a trivial Flow with an obvious extra pull and yet the test still did not fail.
Here is an example:
class EagerFlow extends GraphStage[FlowShape[String, String]] {
val in: Inlet[String] = Inlet("EagerFlow.in")
val out: Outlet[String] = Outlet("EagerFlow.out")
override def shape: FlowShape[String, String] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = {
println("EagerFlow.in.onPush")
println("EagerFlow.out.push")
push(out, grab(in))
println("EagerFlow.in.pull")
pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull() = {
println("EagerFlow.out.onPull")
println("EagerFlow.in.pull")
pull(in)
}
})
}
}
Here is a test that should pass, i.e. confirm that there is an extra request.
"Eager Flow" should "pull on every push" in {
val sourceProbe = TestSource.probe[String]
val sinkProbe = TestSink.probe[String]
val eagerFlow = Flow.fromGraph(new EagerFlow)
val (source, sink) = sourceProbe.
via(eagerFlow).
toMat(sinkProbe)(Keep.both).
run
sink.request(1)
source.expectRequest()
source.expectNoMsg()
source.sendNext("hello")
sink.expectNext()
source.expectRequest()
}
In standard output you can see:
0C3E08A8 PULL DownstreamBoundary -> TestSpec$EagerFlow@1d154180 (TestSpec$EagerFlow$$anon$4$$anon$6@7e62ecbd) [TestSpec$EagerFlow$$anon$4@36904dd4]
EagerFlow.out.onPull
EagerFlow.in.pull
0C3E08A8 PULL TestSpec$EagerFlow@1d154180 -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]
EagerFlow.in.onPush
EagerFlow.out.push
EagerFlow.in.pull
0C3E08A8 PULL TestSpec$EagerFlow@1d154180 -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]
assertion failed: timeout (3 seconds) during expectMsg:
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:368)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:737)
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:665)
at akka.stream.testkit.TestPublisher$Probe.expectRequest(StreamTestKit.scala:172)
(This is with the debug logging in akka.stream.impl.fusing.GraphInterpreter#processEvent
, well actually using print-points but effectively the same thing)
Why does this pull stop in the UpstreamBoundary
and never make it back to the TestSource.probe
?
I have quite a few tests which use expectNoMsg()
to ensure correct backpressure but it seems these will just give false positives. How should I be testing this if this doesn't work?
To make your test applicable you'll need to set up your Flow buffer to be of size 1.
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 1, maxSize = 1)
implicit val mat = ActorMaterializer(settings)
The default sizes are:
# Initial size of buffers used in stream elements
initial-input-buffer-size = 4
# Maximum size of buffers used in stream elements
max-input-buffer-size = 16
This means that Akka will eagerly request and ingest up to 16 elements anyway at your first request. More info on internal buffers can be found here.