I'm trying to create an example akka stream that takes a CSV file, changes it to XML (using an already existing object which has a toXml function) and then posts this to an endpoint. The code I have created looks like this:-
val poolClientFlow =
Http().cachedHostConnectionPool[Thing]("localhost",5000)
val file = new File("./example.csv")
val uploadPipeline =
FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.map(_.split(","))
.map(t => Thing(t(0),t(1).toInt,t(2).toInt) )
.map(_.toXml)
.map(_.toString)
.map(ByteString(_))
.map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d))
.via(poolClientFlow)
.runForeach(x => System.out.println(x.toString()))
However it doesn't compile as the call to .via(poolClientFlow)
as it found a akka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse],
but this version of via expects a akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?]
.
I think I have not constructed my poolClientFlow
correctly, but I don't see the difference between what I have done, and what I've seen in other example code. Can anyone help?
The Flow from cachedHostConnectionPool[T]
takes a tuple of (HttpRequest,T)
This lets you keep some context of your request when you eventually get the result as (Try[HttpResponse], T)
. If you dont need that just pass in Unit ()
.
Im not sure if there is an API that just takes the Request.
To get your example compiling you can ..
.map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d))
.via(Http().cachedHostConnectionPool[ByteString]("localhost",5000))
Also if I where you I wouldnt have quite so many .map
's in that code. Your serialising isnt blocking and doesnt really need the back pressure between all of those steps. I'd have a pure function def write(t: Thing): HttpRequest
. But thats no big deal...