Search code examples
scalagroupingakka-stream

Grouping of elements in Scala / Akka Streams


Suppose I have a Source of different fruits, and I want to insert their count to a database.

I can do something like this:

Flow[Fruits]
.map { item =>
    insertItemToDatabase(item)
}

But that is obviously slow – why insert to a database with every item, when I can group them up? So I came up with a better solution:

Flow[Fruits]
.grouped(10000)
.map { items =>
    insertItemsToDatabase(items)
}

But that means that I have to hold 10 000 elements [banana, orange, orange, orange, banana, ...] in memory until they are flushed to database. Isn't this inefficient? Perhaps I can do something like this:

Flow[Fruits]
.grouped(100)
.map { items =>
    consolidate(items)  // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
    insertMapToDatabase(mapOfItems)
}

From my understanding, this should also process 10 000 elements at once, but shouldn't take up as much memory (providing the elements are repeated often). But each key is still repeated 100 times in memory. Sure I can do .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... But isn't there a better way? Perhaps something like this:

Flow[Fruits]
.map { item =>
    addToMap(item)
    if(myMap.length == 10000) {
        insertToDatabase(myMap)
        clearMyMap()
    }
}

But doesn't it break the concept of Akka streams, namely independency (and therefore concurrency) of processing stages?


Solution

  • If the cardinality of the Fruit set is low then you can keep a singular Map with all of the counts and then flush that to the database after streaming through all of the Fruit values.

    First, construct a Flow that will keep the running count:

    type Count = Int
    
    type FruitCount = Map[Fruit, Count]
    
    val zeroCount : FruitCount = 
      Map.empty[Fruit, Count] withDefaultValue 0
    
    val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
      (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)
    
    val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
      Flow[Fruit].scan(zeroCount)(appendFruitToCount)
    

    Now create a Sink that will receive the last FruitCount and materialize the stream:

    val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]
    
    val fruitSource : Source[Fruit, NotUsed] = ???
    
    val lastFruitCountFut : Future[Option[FruitCount]] = 
      fruitSource
        .via(fruitCountFlow)
        .to(lastFruitCountSink)
        .run()
    

    The lastFruitCountFut can then be used to send values to the database:

    lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
      insertItemsToDatabase( Iterator.fill(count)(fruit) )
    }))
    

    An Iterator is used because it is the most memory efficient collection for constructing a TraversableOnce of Fruit items.

    This solution will only keep 1 Map in memory which will have 1 key for each distinct Fruit type & 1 Integer for each key.