Search code examples
scalaakkaakka-streamakka-http

http => akka stream => http


I'd like to use akka streams in order to pipe some json webservices together. I'd like to know the best approach to make a stream from an http request and stream chunks to another. Is there a way to define such a graph and run it instead of the code below? So far I tried to do it this way, not sure if it is actually really streaming yet:

override def receive: Receive = {
   case GetTestData(p, id) =>
     // Get the data and pipes it to itself through a message as recommended
     // https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
     http.singleRequest(HttpRequest(uri = uri.format(p, id)))
       .pipeTo(self)

   case HttpResponse(StatusCodes.OK, _, entity, _) =>
     val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))

     // Forward the response to next job and pipes the request response to dedicated actor
     http.singleRequest(HttpRequest(
       method = HttpMethods.POST,
       uri = "googl.cm/flow",
       entity = HttpEntity.Chunked(ContentTypes.`application/json`, 
       initialRes)
     ))


   case resp @ HttpResponse(code, _, _, _) =>
     log.error("Request to test job failed, response code: " + code)
     // Discard the flow to avoid backpressure
     resp.discardEntityBytes()

   case _ => log.warning("Unexpected message in TestJobActor")
 }

Solution

  • This should be a graph equivalent to your receive:

    Http()
    .cachedHostConnectionPool[Unit](uri.format(p, id))
    .collect {
      case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
        val initialRes = entity.dataBytes
          .via(JsonFraming.objectScanner(Int.MaxValue))
          .map(bStr => ChunkStreamPart(bStr.utf8String))
        Some(initialRes)
    
      case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
        log.error("Request to test job failed, response code: " + code)
        // Discard the flow to avoid backpressure
        resp.discardEntityBytes()
        None
    }
    .collect {
      case Some(initialRes) => initialRes
    }
    .map { initialRes =>
      (HttpRequest(
         method = HttpMethods.POST,
         uri = "googl.cm/flow",
         entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
       ),
       ())
    }
    .via(Http().superPool[Unit]())
    

    The type of this is Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool], where the Unit is a correlation ID you can use if you want to know which request corresponds to the response arrived, and HostConnectionPool materialized value can be used to shut down the connection to the host. Only cachedHostConnectionPool gives you back this materialized value, superPool probably handles this on its own (though I haven't checked). Anyway, I recommend you just use Http().shutdownAllConnectionPools() upon shutdown of your application unless you need otherwise for some reason. In my experience, it's much less error prone (e.g. forgetting the shutdown).

    You can also use Graph DSL, to express the same graph:

    val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    
        val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
        val host2Flow = b.add(Http().superPool[Unit]())
    
        val toInitialRes = b.add(
          Flow[(Try[HttpResponse], Unit)]
            .collect {
              case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
                val initialRes = entity.dataBytes
                  .via(JsonFraming.objectScanner(Int.MaxValue))
                  .map(bStr => ChunkStreamPart(bStr.utf8String))
                Some(initialRes)
    
              case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
                log.error("Request to test job failed, response code: " + code)
                // Discard the flow to avoid backpressure
                resp.discardEntityBytes()
                None
            }
        )
    
        val keepOkStatus = b.add(
          Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
            .collect {
              case Some(initialRes) => initialRes
            }
        )
    
        val toOtherHost = b.add(
          Flow[Source[HttpEntity.ChunkStreamPart, Any]]
            .map { initialRes =>
              (HttpRequest(
                 method = HttpMethods.POST,
                 uri = "googl.cm/flow",
                 entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
               ),
               ())
            }
        )
    
        host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow
    
        FlowShape(host1Flow.in, host2Flow.out)
    })