Search code examples
jsonscalaakkaakka-streamakka-http

Get Seq of entities from akka http response based on json array


I'm trying to use Flow like Akka http client and got in troubles with getting Seq of case class objects from json array in http response.

HTTP response:

{
 "bars": [],
 "foos": [
   {
    "id": "a7d1ba80-0934-11e9-0ef9-efa612d204a1",
    "type": "manual",
    "color": "green",
   },
   {
    "id": "b7d1ba80-0934-11e9-0ef9-efa612d204a2",
    "type": "semi-manual",
    "color": "white"
   }
 ]
}

Cases classes:

case class FooResponse(foos: Seq[Foo])
case class Foo(id: String, type: String, color: String)

Client I did:

private val flow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection(host, port)

def getFoos(): Seq[Foo] = {
  val req = HttpRequest(method = HttpMethods.GET)
  .withUri(Uri("/api/foo")).withHeaders(headers.Accept(MediaRange(MediaTypes.`application/json`)))

  Source
   .single(req)
   .via(flow)
   .map(response => Unmarshal(response.entity).to[FooResponse])
}

As a result I have a Source with Future[FooResponse]. How could I return Seq[Foo] from it as a function result.


Solution

  • I recommend changing the return type of getFoos from Seq[Foo] to Future[Seq[Foo]], in order to stay within the context of a Future:

    def getFoos(): Future[Seq[Foo]] = {
      val req =
        HttpRequest(method = HttpMethods.GET)
          .withUri(Uri("/api/foo"))
          .withHeaders(headers.Accept(MediaRange(MediaTypes.`application/json`)))
    
      Source
        .single(req)
        .via(flow)
        .map(response => Unmarshal(response.entity).to[FooResponse])
        .mapAsync(parallelism = 1)(fooResponse => fooResponse.map(_.foos))
        .runWith(Sink.head)
    }
    

    Also, since type is a reserved keyword in Scala, you need to wrap it in backticks in the Foo case class:

    case class Foo(id: String, `type`: String, color: String)