I have the following problem. I am querying a server for some data and getting it back as HttpEntity.Chunked. The response String looks like this with up to 10.000.000 lines like this:
[{"name":"param1","value":122343,"time":45435345},
{"name":"param2","value":243,"time":4325435},
......]
Now I want to get the incoming data into and Array[String] where each String is a line from the response, because later on it should be imported into an apache spark dataframe. Currently I am doing it likes this:
//For the http request
trait StartHttpRequest {
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = {
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
Http().outgoingConnection(host, port = targetPort)
}
val responseFuture: Future[HttpResponse] =
Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data)))
.via(connectionFlow)
.runWith(Sink.head)
responseFuture
}
}
//result of the request
val responseFuture: Future[HttpResponse] = httpRequest(.....)
//convert to string
responseFuture.flatMap { response =>
response.status match {
case StatusCodes.OK =>
Unmarshal(response.entity).to[String]
}
}
//and then something like this, but with even more stupid stuff
responseFuture.onSuccess { str:String =>
masterActor! str.split("""\},\{""")
}
My question is, what would be a better way to get the result into an array? How can I unmarshall the response entity directly? Because .to[Array[String]] for example did not work. And because there are so many lines coming, could I do it with a stream, to be more efficent?
Answering your questions out of order:
How can I unmarshall the response entity directly?
There is an existing question & answer related to unmarshalling an Array of case classes.
what would be a better way to get the result into an array?
I would take advantage of the Chunked nature and use streams. This allows you to do string processing and json parsing concurrently.
First you need a container class and parser:
case class Data(name : String, value : Int, time : Long)
object MyJsonProtocol extends DefaultJsonProtocol {
implicit val dataFormat = jsonFormat3(Data)
}
Then you have to do some manipulations to get the json objects to look right:
//Drops the '[' and the ']' characters
val dropArrayMarkers =
Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte))
val preppendBrace =
Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s)
val appendBrace =
Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s)
val parseJson =
Flow[String].map(_.parseJson.convertTo[Data])
Finally, combine these Flows to convert a Source of ByteString into a Source of Data objects:
def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] =
source.via(dropArrayMarkers)
.via(Framing.delimiter(ByteString("},{"), 256, true))
.map(_.utf8String)
.via(prependBrace)
.via(appendBrace)
.via(parseJson)
This source can then be drained into an Seq
of Data objects:
val dataSeq : Future[Seq[Data]] =
responseFuture flatMap { response =>
response.status match {
case StatusCodes.OK =>
strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq)
}
}