Search code examples
scalaakka-stream

Batching / Framing in Akka Stream


I have a Source[Animal], where Animal is of 2 types Cat and Dog. The source is something like dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ... I'm trying to convert it to the following Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ... How this works is:

  • at most 3 dogs per batch, at most 1 cat per batch (alternatively a solution for the following is also fine: at most 4 animals per batch, at most 1 cat per batch)
  • a cat must only be the final (aka framing) element in a batch
  • also, i can't show speed in an example, but there should be a timeout after which the batch (even if not full and no cat) is still emitted. Something like groupedWithin(4, FiniteDuration(3, SECONDS))
  • overall order is important and must be maintained

I've been trying stuff with batchWeighted and groupedWithin but I don't have a proper solution as yet.

One idea I tried was to weigh Dog as 1 and Cat as 1000 and use batchWeighted with max weight = 1003, but this won't ensure that Cat is always the last batch element... Trying the same with max weight = 3 always puts Cat in separate groups.

If there was a hybrid of batchWithin and takeWhile (without termination) then it may have solved this use-case.

This is a pretty straight forward thing to solve if its just iterating over a List, but being constrained to use FlowOps makes it a bit challenging

Edit: Currently I'm doing the following:

  .groupedWithin(4, FiniteDuration(4, SECONDS))
  .map(frameBatch(_, Vector(), 0))
  // groupedWithin internally returns a Vector so is fast for indexed operations

  @tailrec
  private def frameBatch(
      items: Seq[Animal],
      result: Vector[Seq[Animal]],
      offset: Int
    ): Vector[Seq[Animal]] = {
    val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
    if (index == -1) {
      if (offset == 0) {
        Vector(items)
      } else {
        result :+ items.slice(offset, items.size)
      }
    } else {
      frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
    }
  }

Solution

  • It's possible to do this with just the Akka Stream primitives (with a minor detour into Akka Actors):

    object BatchFrame {
      def batchFrame[M](
        source: Source[Animal, M],
        batchSize: Int,
        interval: FiniteDuration)(implicit system: ActorSystem): Source[Seq[Animal], M] = {
    
        require(batchSize > 0)
    
        import system.dispatcher
    
        implicit val materializer = ActorMaterializer()
    
        val dataSource = source.map(x => Some(x))
        val (timerRef, timerSource) = Source.actorRef[Any](1, OverflowStrategy.dropHead).map(_ => None).preMaterialize()
    
        val merged = dataSource.merge(timerSource, eagerComplete = true)
    
        var nextTick: Option[Cancellable] = None
    
        def scheduleTick(): Unit = {
          nextTick = nextTick.flatMap { c => c.cancel(); None }
          nextTick = Option(system.scheduler.scheduleOnce(interval, timerRef, None))
        }
    
        scheduleTick()
    
        merged.statefulMapConcat{ () =>
          var dogCount = 0
          var frame: List[Animal] = Nil
    
          def emit(): List[Seq[Animal]] = {
            scheduleTick()
            val ret = List(frame.reverse)
            dogCount = 0
            frame = Nil
            ret
          }
    
          def emitWith(a: Animal): List[Seq[Animal]] = {
            frame = a :: frame
            emit()
          }
    
          in: Option[Animal] => {
            in match {
              case Some(cat: Cat) =>
                emitWith(cat)
              case Some(dog: Dog) if dogCount < (batchSize - 1) =>
                dogCount += 1
                frame = dog :: frame
                Nil
              case Some(dog: Dog) =>
                emitWith(dog)
              case _ =>
                emit()
            }
          }
        }
      }
    }
    

    The main trick (which I had to look up and experiment with to prove it to myself) is to prematerialize the timing Source so you have the ActorRef available for scheduling ticks.