Search code examples
scalaakka-stream

Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink)


I'm totally new in Scala and Akka. I've a simple RunnableFlow:

Source -> Flow (do some transformation) -> Sink.runForeach

Now I want something like this:

Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach

But Flow2 should wait until 100 elements from Flow1 are available and then transform these 100 elements to a new element (which needs all 100 elements from Flow1) and give this new element to the Sink.

I did some research and found Explicit user defined buffers but I don´t understand how I can access all 100 elements from flow1 in flow2 and do some transformation with them. Can someone explain it? Or even better post a small simple example ? Or both?


Solution

  • Akka Defined Collection

    If you don't mind using an akka determined collection type then you can use the grouped function instead:

    //alternative stream formation
    val stream = Source(1 to 100).via(Flow[Int].grouped(bufferSize))
                                 .runWith(Sink foreach println)
    

    User Defined Collection

    If you want to control the type of collection used for your buffer, e.g. a Seq or Array:

    type MyCollectionType[X] = Array[X]
    
    def emptyMyCollection[X] : MyCollectionType[X] = Array.empty[X]
    

    Then you can perform this operation with two Flows. The first Flow executes a scan to build up a sequence of elements:

    val bufferSize = 10
    
    def appendToMyCollection[X](coll : MyCollectionType[X], i : X) : MyCollectionType[X] = 
      (if(coll.size < bufferSize) coll else emptyMyCollection[Int]) :+ i
    
    val buffer : Flow[Int, MyCollectionType[Int], _] = 
      Flow[Int].scan[MyCollectionType[Int]](emptyMyCollection[Int]){
        (coll, i) => appendToMyCollection(coll, i)
      }
    

    The second Flow is a filter for a sequence with just the right size (i.e. "goldiLocks"):

    val goldiLocks : Flow[MyCollectionType[Int], MyCollectionType[Int],_] =
      Flow[MyCollectionType[Int]].filter(_.size == bufferSize)
    

    These two Flows can be combined to produce a Stream which will generate the desired collection type:

    val stream = Source(1 to 100).via(buffer)
                                 .via(goldiLocks)
                                 .runWith(Sink foreach println)