Search code examples
scalafunctional-programmingfs2

Interleave multiple streams


I have a list of streams List[Stream[_]], size of list is known at the beginning of function, size of each stream equals n or n+1. I'd like to obtain interleave stream e.g.

def myMagicFold[A](s: List[Stream[A]]): Stream[A]

val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4)) 

val result = myMagicFold(streams)

//result = Stream(1,2,3,4,1,2,3,4,1,2)

I'm using fs2.Stream. My first take:

val result = streams.fold(fs2.Stream.empty){
   case (s1, s2) => s1.interleaveAll(s2)
}

// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)

I'm looking for a solution based on basic operations (map, fold,...)


Solution

  • Your initial guess was good, however interleaveAll flattens too soon, so that's why you don't get expected order. Here's the code that should do what you try to achieve:

    
      def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
        streams
          .foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
            zipStreams(acc, s)
          }
          .flatMap(l => Stream.emits(l.reverse.flatten))
    
      def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
        s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }
    
    

    In this case, you're adding n-th element of each stream into the list and then convert to the Stream which is later flattened to the result stream. Since fs2.Stream is pull-based you only have one list in memory at a time.