Akka Streams recursive flow call

I'm trying to do implement paging using Akka Streams. Currently I have

case class SomeObject(id:Long, next_page:Option[Map[String,String]])
def chainRequests(uri: Uri): Future[Option[(Uri, T)]] = {
    if (uri.isEmpty) return Future.successful(None)
    val response: Future[Response[T]] = sendWithRetry(prepareRequest(HttpMethods.GET, uri)).flatMap(unmarshal) { resp =>
      resp.next_page match {
        case Some(next_page) => Some(next_page("uri"),
        case _ => Some(Uri.Empty,
Source.single(SomeObject).map(Uri(s"object/${}")).map(uri => Source.unfoldAsync(url)(chainRequest)).map(...some processing goes here)

The problem is that if I do source.take(1000) and paging has a lot of elements(pages) than downstream does not gets new elements until Source.unfoldAsync finishes.

I was trying to use cycles in Flows like

val in = builder.add(Flow[Uri])
val out = builder.add[Flow[T]]

val partition = b.add(Partition[Response[T]](2,r => r.next_page match {case Some(_)=>1; case None => 0}))
val merge = b.add(Merge[Response[T]],2)
in ~> mergeUri ~> sendRequest ~> partition
      mergeUri.preferred <~ extractNextUri <~ partition.out(1)
      partition.out(0) ~> Flow[Response[T]].map( ~> out
FlowShape(, out.out)

But above code does not work.

I'm stuck with creating my own GraphStage. UnfoldAsync takes first element, but with Flow solution I don't have "first" element. Any suggestions?



  • Found the solution with writing my own GraphStage

    final class PaginationGraphStage[S <: Uri, E](f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContextExecutor)
      extends GraphStage[FlowShape[S, E]]{
      val in: Inlet[S] = Inlet[S]("")
      val out: Outlet[E] = Outlet[E]("PaginationGraphStage.out")
      override val shape: FlowShape[S, E] = FlowShape.of(in, out)
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) with OutHandler with InHandler {
          private[this] var state: S = _
          private[this] var inFlight = 0
          private[this] var asyncFinished = false
          private[this] def todo: Int = inFlight
          def futureCompleted(result: Try[Option[(Uri, E)]]): Unit = {
            inFlight -= 1
            result match {
              case Failure(ex) => fail(out, ex)
              case Success(None) =>
                asyncFinished = true
              case Success(Some((newS: S, elem: E))) if !newS.isEmpty =>
                push(out, elem)
                state = newS
              case Success(Some((newS: Uri, elem: E))) =>
                push(out, elem)
                asyncFinished = true
                if (isAvailable(in)) getHandler(in).onPush()
                else completeStage()
          private val futureCB = getAsyncCallback(futureCompleted)
          private val invokeFutureCB: Try[Option[(S, E)]] => Unit = futureCB.invoke
          private def pullIfNeeded(): Unit = {
            if (!hasBeenPulled(in)) tryPull(in)
          override def onUpstreamFinish(): Unit = {
            if (todo == 0) completeStage()
          def onPull(): Unit = {
            if (state != null) {
              asyncFinished = false
              inFlight += 1
              val future = f(state)
              future.value match {
                case None => future.onComplete(invokeFutureCB)
                case Some(v) => futureCompleted(v)
            } else {
          override def onPush(): Unit = {
            if (state == null) {
              inFlight += 1
              state = grab(in)
            if (asyncFinished) {
              inFlight += 1
              state = grab(in)
          setHandlers(in, out, this)