Search code examples
scalaakkaakka-streamakka-http

using akka streams for downloading binary data from server


while working with streams, I started to abstract my download handling into a RunnableGraph.

in this Graph I use one url to start my download process, I request a HttpResponse from url and evaluate the StatusCode. If there is a Redirect response, my process gives that Location header value to requestProcessor and try to download that again. If 200 receives, my process accumulate binary and send it to my sink.

My Problem: here is my out push statement and that sink will never receive any data :(

I hope my custom FlowShape is written right, but may there is an issue hidden.


Solution

  • getAsyncCallback works for my issue how jrudolph described in comment.

    changed code to:

    def onGrab(current: HttpResponse): Unit = {
       if (redirectCodes.contains(current.status) && current.header[Location].isDefined) {
         push[String](redirect, current.header[Location].get.value())
         current.entity.discardBytes()
         pull(in)
       } else {
         logger.debug("download . . .")
         current
           .entity
           .dataBytes
           .map(_.toArray)
           .runWith(Sink.head)
           .foreach(
             getAsyncCallback[Array[Byte]](value => push(out, value)).invoke
         )
       }
    }