Search code examples
scalaakka-stream

Converting paging function to a Flow


I have a large quantity of sqlite databases, represented as Source[File, NotUsed]. For each db, I want to paginate through the results. Memory limits mean I cannot do this eagerly. Say that the result type is Foo, then I'm trying to figure out how to create a Flow[File, Foo, NotUsed] that internally uses a lazy, recursive call on the resource.

I see that the Source.unfold method allows me to do this, but it can only create a Source, which means I can't feed it the necessary input of File. I can't see how to convert a Source to a Flow (except via fromSinkAndSource, but that doesn't pipe the values through). I'm not sure if this path of inquiry will yield anything.

It was suggested to me that I should use the GraphDSL and Merge, but I'm stuck trying to understand how many input ports the Merge should have and how I would actually wire it together.


Solution

  • I think you're looking for the flatMapConcat operator:

    Signature

    def flatMapConcat[T, M](f: Out ⇒ Graph[SourceShape[T], M]): Repr[T]
    

    Description

    Transform each input element into a Source whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.

    emits when the current consumed substream has an element available

    backpressures when downstream backpressures

    completes when upstream completes and all consumed substreams complete