Search code examples
scalaakka-streamlagombackpressure

Backpressure between Lagom services using streaming ServiceCall not working


Lagom: 1.5.4

Consider having a ServiceCall (example):

def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
  Future.successful(
    Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
  )
}

When another service (example) is consuming this ServiceCall by e.g.:

val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
  case (msg, _) =>
    log.info(s"received $msg")
    msg
}.runWith(Sink.seq))

You would expect the artificially slow consumer would slow down the producer. Looking at the logs this doesn't seem to be the case:

sending 1
sending 2
sending 3
[...]
sending 1000

[1 second pause]

received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]

Am I missing any hidden buffers?

Example code:
https://github.com/an-tex/lagom-backpressure

run sbt runAll

and then execute curl 127.0.0.1:[port of hello-world-stream-client service]/api/test

to see the effect


Solution

  • There are system buffers exceeding the test size. On Mac OS there seems to be a 128kb (512kb burst) buffer. Outside of the buffers the backpressure works like a charm. I've updated the github repo with a bigger test size in case someone wants to play around.

    Credit goes to TimMoore who answered this question on Lightbend Discuess