I have a Source[Animal]
, where Animal
is of 2 types Cat
and Dog
. The source
is something like dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ...
I'm trying to convert it to the following Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ...
How this works is:
groupedWithin(4, FiniteDuration(3, SECONDS))
I've been trying stuff with batchWeighted
and groupedWithin
but I don't have a proper solution as yet.
One idea I tried was to weigh Dog
as 1
and Cat
as 1000
and use batchWeighted
with max weight = 1003
, but this won't ensure that Cat
is always the last batch element... Trying the same with max weight = 3
always puts Cat
in separate groups.
If there was a hybrid of batchWithin
and takeWhile
(without termination) then it may have solved this use-case.
This is a pretty straight forward thing to solve if its just iterating over a List
, but being constrained to use FlowOps
makes it a bit challenging
Edit: Currently I'm doing the following:
.groupedWithin(4, FiniteDuration(4, SECONDS))
.map(frameBatch(_, Vector(), 0))
// groupedWithin internally returns a Vector so is fast for indexed operations
@tailrec
private def frameBatch(
items: Seq[Animal],
result: Vector[Seq[Animal]],
offset: Int
): Vector[Seq[Animal]] = {
val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
if (index == -1) {
if (offset == 0) {
Vector(items)
} else {
result :+ items.slice(offset, items.size)
}
} else {
frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
}
}
It's possible to do this with just the Akka Stream primitives (with a minor detour into Akka Actors):
object BatchFrame {
def batchFrame[M](
source: Source[Animal, M],
batchSize: Int,
interval: FiniteDuration)(implicit system: ActorSystem): Source[Seq[Animal], M] = {
require(batchSize > 0)
import system.dispatcher
implicit val materializer = ActorMaterializer()
val dataSource = source.map(x => Some(x))
val (timerRef, timerSource) = Source.actorRef[Any](1, OverflowStrategy.dropHead).map(_ => None).preMaterialize()
val merged = dataSource.merge(timerSource, eagerComplete = true)
var nextTick: Option[Cancellable] = None
def scheduleTick(): Unit = {
nextTick = nextTick.flatMap { c => c.cancel(); None }
nextTick = Option(system.scheduler.scheduleOnce(interval, timerRef, None))
}
scheduleTick()
merged.statefulMapConcat{ () =>
var dogCount = 0
var frame: List[Animal] = Nil
def emit(): List[Seq[Animal]] = {
scheduleTick()
val ret = List(frame.reverse)
dogCount = 0
frame = Nil
ret
}
def emitWith(a: Animal): List[Seq[Animal]] = {
frame = a :: frame
emit()
}
in: Option[Animal] => {
in match {
case Some(cat: Cat) =>
emitWith(cat)
case Some(dog: Dog) if dogCount < (batchSize - 1) =>
dogCount += 1
frame = dog :: frame
Nil
case Some(dog: Dog) =>
emitWith(dog)
case _ =>
emit()
}
}
}
}
}
The main trick (which I had to look up and experiment with to prove it to myself) is to prematerialize the timing Source
so you have the ActorRef
available for scheduling ticks.