So, I'm using Play framework 2.7 to setup a streaming server. What I'm trying to do is stream about 500 custom case class objects that are all of similar size.
This is part of the controller that generates the stream -
def generate: Action[AnyContent] = Action {
val products = (1 to 500).map(Product(_, "some random string")).toList
Ok.chunked[Product](Source(products))
}
where Product
is the custom case class I'm using. An implicit writable
deserialises this object to a json.
and this is part of the controller that processes this stream -
def process(): Action[AnyContent] = Action.async {
val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
request.stream().flatMap {
_.bodyAsSource
.map(_.utf8String)
.map { x => println(x); x }
.fold(0) { (acc, _) => acc + 1 }
.runWith(Sink.last)
.andThen {
case Success(v) => println(s"Total count - $v")
case Failure(_) => println("Error encountered")
}
}.map(_ => Ok)
}
What I expected is that each object of my case class is transmitted as a single chunk and received likewise, so that they can be individually serialised and used by the receiver. That means, using the above code, my expectation is that I should receive exactly 500 chunks, but this value always comes out to be more than that.
What I can see is that exactly one object among these 500 is transmitted in split and transmitted in 2 chunks instead of 1.
This is a normal object, as seen on the receiving side -
{
"id" : 494,
"name" : "some random string"
}
and this is an object that's split in two -
{
"id" : 463,
"name" : "some random strin
g"
}
as such, this cannot be serialised back into an instance of my Product
case class.
However, if I have some sort of throttling on the source in the sender controller, I receive the chunks just as expected.
For instance, this works completely fine where I stream only 5 elements per second -
def generate: Action[AnyContent] = Action {
val products = (1 to 500).map(Product(_, "some random string")).toList
Ok.chunked[Product](Source(products).throttle(5, 1.second))
}
Can anyone help me understand why this happens?
As described here there is a JsonFraming
to separate valid JSON objects from incoming ByteString
stream.
In your case you can try this way
_.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)