Search code examples
scalapaginationakkaakka-stream

Akka Streams flow to handle paginated results doesn't complete


I'd like to implement a Flow to handle paginated results (e.g., underlying service returns some results, but also indicates that more results are available by making another request, passing in e.g. a cursor).

Things I've done so far:

  1. I have implemented the following flow and test, but the flow doesn't complete.

    object AdditionalRequestsFlow {
    
      private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = {
        Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
          import GraphDSL.Implicits._
          val in = builder.add(Flow[Request])
    
          val bcast = builder.add(Broadcast[Request](2))
          val merge = builder.add(Zip[Request, Response]())
    
          in ~> bcast         ~> merge.in0
                bcast ~> flow ~> merge.in1
    
          FlowShape(in.in, merge.out)
        })
      }
    
      def flow[Request, Response, Output](
        inputFlow: Flow[Request, Response, NotUsed],
        anotherRequest: (Request, Response) => Option[Request],
        extractOutput: Response => Output,
        mergeOutput: (Output, Output) => Output
      ): Flow[Request, Output, NotUsed] = {
        Flow.fromGraph(GraphDSL.create() { implicit b =>
          import GraphDSL.Implicits._
    
          val start = b.add(Flow[Request])
          val merge = b.add(Merge[Request](2))
          val underlying = b.add(keepRequest(inputFlow))
          val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList))
          val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) =>
            (res, anotherRequest(req, res))
          })
          val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work
    
          start ~> merge ~> underlying ~> unzip.in
                                          unzip.out0            ~>  finish
                   merge <~ unOption   <~ unzip.out1
    
          FlowShape(start.in, finish.out)
        })
      }       
    }
    

    The test:

        import akka.NotUsed
        import akka.actor.ActorSystem
        import akka.stream.ActorMaterializer
        import akka.stream.scaladsl.{Flow, Sink, Source}
        import org.scalatest.FlatSpec
        import org.scalatest.Matchers._
        import cats.syntax.option._
        import org.scalatest.concurrent.ScalaFutures.whenReady
    
        class AdditionalRequestsFlowSpec extends FlatSpec {
          implicit val system = ActorSystem()
          implicit val materializer = ActorMaterializer()
    
          case class Request(max: Int, batchSize: Int, offset: Option[Int] = None)
          case class Response(values: List[Int], nextOffset: Option[Int])
    
          private val flow: Flow[Request, Response, NotUsed] = {
            Flow[Request]
              .map { request =>
                val start = request.offset.getOrElse(0)
                val end = Math.min(request.max, start + request.batchSize)
                val nextOffset = if (end == request.max) None else Some(end)
                val result = Response((start until end).toList, nextOffset)
                result
              }
          }
    
          "AdditionalRequestsFlow" should "collect additional responses" in {
            def anotherRequest(request: Request, response: Response): Option[Request] = {
              response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) }
            }
    
            def extract(x: Response): List[Int] = x.values
            def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b
    
            val requests =
              Request(max = 35, batchSize = 10) ::
              Request(max = 5, batchSize = 10) ::
              Request(max = 100, batchSize = 1) ::
              Nil
    
            val expected = requests.map { x =>
              (0 until x.max).toList
            }
    
            val future = Source(requests)
              .via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge))
              .runWith(Sink.seq)
    
            whenReady(future) { x =>
              x shouldEqual expected
            }
          }
        }
    
  2. Implemented the same flow in a terrible, blocking way to illustrate what I'm trying to achieve:

       def uglyHackFlow[Request, Response, Output](
        inputFlow: Flow[Request, Response, NotUsed],
        anotherRequest: (Request, Response) => Option[Request],
        extractOutput: Response => Output,
        mergeOutput: (Output, Output) => Output
      ): Flow[Request, Output, NotUsed] = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
    
        Flow[Request]
          .map { x =>
            def grab(request: Request): Output = {
              val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :(
              val another = anotherRequest(request, response)
              val output = extractOutput(response)
              another.map { another =>
                mergeOutput(output, grab(another))
              } getOrElse output
            }
    
            grab(x)
          }
      }
    

    This works (but we should not be materializing anything / Await-ing at this point).

  3. Reviewed http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks which I believe contains the answer, however I cannot seem to find it there. In my case, I would expect the cycle should contain one element at most times so neither buffer overflow nor complete starvation should occur - but evidently does.

  4. Tried to debug the stream using .withAttributes(Attributes(LogLevels(...))) however it doesn't result in any output despite seemingly correctly configured loggers.

I'm looking for hints how to fix the flow method keeping the same signature and semantics (test would pass).

Or perhaps I'm doing something completely off-base here (e.g., there is an existing feature in, say, akka-stream-contrib which solves this)?


Solution

  • I think it's much safer to use Source.unfold than to create custom graphs. Here is what I typically do (with minor variations depending on API).

      override def getArticles(lastTokenOpt: Option[String], filterIds: (Seq[Id]) => Seq[Id]): Source[Either[String, ImpArticle], NotUsed] = {
    
        val maxRows = 1000
    
        def getUri(cursor: String, count: Int) = s"/works?rows=$count&filter=type:journal-article&order=asc&sort=deposited&cursor=${URLEncoder.encode(cursor, "UTF-8")}"
    
        Source.unfoldAsync(lastTokenOpt.getOrElse("*")) { cursor =>
    
          println(s"Getting ${getUri(cursor, maxRows)}")
          if (cursor.nonEmpty) {
            sendGetRequest[CrossRefResponse[CrossRefList[JsValue]]](getUri(cursor, maxRows)).map {
              case Some(response) =>
                response.message match {
                  case Left(list) if response.status == "ok" =>
    
                    println(s"Got ${list.items.length} items")
                    val items = list.items.flatMap { js =>
                      try {
                        parseArticle(js)
                      } catch {
                        case ex: Throwable =>
                          logger.error(s"Error on parsing: ${js.compactPrint}")
                          throw ex
                      }
                    }
    
                    list.`next-cursor` match {
                      case Some(nextCursor) =>
                        Some(nextCursor -> (items.map(Right.apply).toList ::: List(Left(nextCursor))))
                      case None =>
                        logger.error(s"`next-cursor` is missing when fetching from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
                        Some("" -> items.map(Right.apply).toList)
                    }
                  case Left(jsvalue) if response.status != "ok" =>
                    logger.error(s"API error on fetching data from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
                    None
                  case Right(someError) =>
                    val cause = someError.fold(errors => errors.map(_.message).mkString(", "), ex => ex.message)
                    logger.error(s"API error on fetching data from CrossRef [status $cause}][${getUri(cursor, maxRows)}]")
                    None
                }
    
              case None =>
                logger.error(s"Got error on fetching ${getUri(cursor, maxRows)} from CrossRef")
                None
            }
          } else
            Future.successful(None)
        }.mapConcat(identity)
      }
    

    In your case you probably don't even need to push cursor to the stream. I do that because I store last successful cursor in database to be able to resume later in case of failure.