I'd like to use akka streams in order to pipe some json webservices together. I'd like to know the best approach to make a stream from an http request and stream chunks to another. Is there a way to define such a graph and run it instead of the code below? So far I tried to do it this way, not sure if it is actually really streaming yet:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
// https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))
// Forward the response to next job and pipes the request response to dedicated actor
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`,
initialRes)
))
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
This should be a graph equivalent to your receive
:
Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
.collect {
case Some(initialRes) => initialRes
}
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
.via(Http().superPool[Unit]())
The type of this is Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool]
, where the Unit
is a correlation ID you can use if you want to know which request corresponds to the response arrived, and HostConnectionPool
materialized value can be used to shut down the connection to the host. Only cachedHostConnectionPool
gives you back this materialized value, superPool
probably handles this on its own (though I haven't checked). Anyway, I recommend you just use Http().shutdownAllConnectionPools()
upon shutdown of your application unless you need otherwise for some reason. In my experience, it's much less error prone (e.g. forgetting the shutdown).
You can also use Graph DSL, to express the same graph:
val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
val host2Flow = b.add(Http().superPool[Unit]())
val toInitialRes = b.add(
Flow[(Try[HttpResponse], Unit)]
.collect {
case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
val initialRes = entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(bStr => ChunkStreamPart(bStr.utf8String))
Some(initialRes)
case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
None
}
)
val keepOkStatus = b.add(
Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
.collect {
case Some(initialRes) => initialRes
}
)
val toOtherHost = b.add(
Flow[Source[HttpEntity.ChunkStreamPart, Any]]
.map { initialRes =>
(HttpRequest(
method = HttpMethods.POST,
uri = "googl.cm/flow",
entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
),
())
}
)
host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow
FlowShape(host1Flow.in, host2Flow.out)
})