while working with streams, I started to abstract my download handling into a RunnableGraph
.
in this Graph
I use one url to start my download process, I request a HttpResponse
from url and evaluate the StatusCode
. If there is a Redirect
response, my process gives that Location
header value to requestProcessor and try to download that again.
If 200 receives, my process accumulate binary and send it to my sink.
My Problem: here is my out push statement and that sink will never receive any data :(
I hope my custom FlowShape
is written right, but may there is an issue hidden.
getAsyncCallback
works for my issue how jrudolph described in comment.
changed code to:
def onGrab(current: HttpResponse): Unit = {
if (redirectCodes.contains(current.status) && current.header[Location].isDefined) {
push[String](redirect, current.header[Location].get.value())
current.entity.discardBytes()
pull(in)
} else {
logger.debug("download . . .")
current
.entity
.dataBytes
.map(_.toArray)
.runWith(Sink.head)
.foreach(
getAsyncCallback[Array[Byte]](value => push(out, value)).invoke
)
}
}