I have a list of streams List[Stream[_]]
, size of list is known at the beginning of function, size of each stream equals n
or n+1
. I'd like to obtain interleave stream
e.g.
def myMagicFold[A](s: List[Stream[A]]): Stream[A]
val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4))
val result = myMagicFold(streams)
//result = Stream(1,2,3,4,1,2,3,4,1,2)
I'm using fs2.Stream
. My first take:
val result = streams.fold(fs2.Stream.empty){
case (s1, s2) => s1.interleaveAll(s2)
}
// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)
I'm looking for a solution based on basic operations (map
, fold
,...)
Your initial guess was good, however interleaveAll
flattens too soon, so that's why you don't get expected order.
Here's the code that should do what you try to achieve:
def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
streams
.foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
zipStreams(acc, s)
}
.flatMap(l => Stream.emits(l.reverse.flatten))
def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }
In this case, you're adding n-th element of each stream into the list and then convert to the Stream
which is later flattened to the result stream.
Since fs2.Stream
is pull-based you only have one list in memory at a time.