Search code examples
scalafunctional-programmingscala-catsfs2

Schedule computation concurrently for all elements of the fs2.Stream


I have an fs2.Stream consisting of some elements (probably infinite) and I want to schedule some computation for all elements of the stream concurrently to each other. Here is what I tried

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)

val stream = for {
  id <- fs2.Stream.emits(List(1, 2)).covary[IO]
  _ <- fs2.Stream.awakeEvery[IO](1.second)
  _ <- fs2.Stream.eval(IO(println(id)))
} yield ()

stream.compile.drain.unsafeRunSync()

The program output looks like

1
1
1
etc...

which is not what's expected. I'd like to interleave the scheduled computation for all of the elements of the original stream, but not wait until the first stream terminates (which never happens due to the infinite scheduling).


Solution

  • val str = for {
      id <- Stream.emits(List(1, 5, 7)).covary[IO]
      res = timer.sleep(id.second) >> IO(println(id))
    } yield res
    
    val stream =  str.parEvalMapUnordered(5)(identity)
    
    stream.compile.drain.unsafeRunSync()
    

    or

     val stream = Stream.emits(List(1, 5, 7))
       .map { id => 
         Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
       .parJoinUnbounded
    
    stream.compile.drain.unsafeRunSync()