Search code examples
scalaakka-httpakka-stream

Akka stream for posting data expects a graph not a flow in via


I'm trying to create an example akka stream that takes a CSV file, changes it to XML (using an already existing object which has a toXml function) and then posts this to an endpoint. The code I have created looks like this:-

val poolClientFlow =
  Http().cachedHostConnectionPool[Thing]("localhost",5000)

val file = new File("./example.csv")

val uploadPipeline =
  FileIO.fromFile(file)
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
    .map(_.utf8String)
    .map(_.split(","))
    .map(t => Thing(t(0),t(1).toInt,t(2).toInt) )
    .map(_.toXml)
    .map(_.toString)
    .map(ByteString(_))
    .map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d))
    .via(poolClientFlow)
    .runForeach(x => System.out.println(x.toString()))

However it doesn't compile as the call to .via(poolClientFlow) as it found a akka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse], but this version of via expects a akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?].

I think I have not constructed my poolClientFlow correctly, but I don't see the difference between what I have done, and what I've seen in other example code. Can anyone help?


Solution

  • The Flow from cachedHostConnectionPool[T] takes a tuple of (HttpRequest,T) This lets you keep some context of your request when you eventually get the result as (Try[HttpResponse], T). If you dont need that just pass in Unit ().

    Im not sure if there is an API that just takes the Request.

    To get your example compiling you can ..

    .map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d))
    .via(Http().cachedHostConnectionPool[ByteString]("localhost",5000))
    

    Also if I where you I wouldnt have quite so many .map's in that code. Your serialising isnt blocking and doesnt really need the back pressure between all of those steps. I'd have a pure function def write(t: Thing): HttpRequest. But thats no big deal...