Search code examples
scalascala-catsfs2

How to enqueue elements to a queue and then dequeue them?


Imports and implicits for the following snippets:

import cats.effect.{ConcurrentEffect, ContextShift, IO, Timer}
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val c: ConcurrentEffect[IO] = IO.ioConcurrentEffect
implicit val t: Timer[IO] = IO.timer(ec)

The goal is to add the integers 1 to 3 to a fs2 Queue and then dequeue them. The first attempt:

val s1 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
  _ <- Stream(1,2,3).map(Some(_)).through(q.enqueue) // Enqueue 1 to 3
  _ <- Stream.eval(q.enqueue1(None)) // Terminate queue
  outStream <- q.dequeue // Get dequeue stream
} yield outStream

s1.compile.toList.unsafeRunSync()

This returns List(1). Not sure why.

Second attempt:

val s2 = for {
  q <- Stream.eval(Queue.noneTerminated[IO, Int]) // Create unbounded queue that terminates when it gets None
  _ <- Stream(
    Stream(1,2,3).map(Some(_)).through(q.enqueue), // Enqueue 1 to 3
    Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) // Wait 1 second and terminate stream
  ).parJoin(2) // Run both streams in parallel
  outStream <- q.dequeue // Get dequeue stream
} yield outStream 

s2.compile.toList.unsafeRunSync()

This returns List(1,2). Also not sure why.

Why these examples returned what they returned? What's the proper way to do it?


Solution

  • Take a look what you actually defined here:

    val s1 = for {
      q <- Stream.eval(Queue.noneTerminated[IO, Int])
      _ <- Stream(1,2,3).map(Some(_)).through(q.enqueue)
      _ <- Stream.eval(q.enqueue1(None))
      outStream <- q.dequeue // Get dequeue stream
    } yield outStream
    

    this is the same as

    Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
    
      Stream(1,2,3).map(Some(_)).through(q.enqueue).flatMap { _ =>
    
        Stream.eval(q.enqueue1(None)).flatMap { _ =>
          q.dequeue
        }
      }
    }
    
    • you create a queue in Stream
    • then for each element of this stream (for each Queue) you enqueue 3 Some elements - lazily meaning that until this element is needed the side-effect is not evaluated
    • then for for each enque you also put None into queue
    • you basically created a stream of Some(1), None, Some(2), None, Some(3), None before you got to the dequeue part! And since it is none terminated Queue it stops after the first None, so you end up with Stream(1) evaluated to List(1)

    Meanwhile with second example you have

    val s2 = for {
      q <- Stream.eval(Queue.noneTerminated[IO, Int]) 
      _ <- Stream(
        Stream(1,2,3).map(Some(_)).through(q.enqueue), 
        Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) 
      ).parJoin(2) 
      outStream <- q.dequeue 
    } yield outStream 
    

    which is equal to

    Stream.eval(Queue.noneTerminated[IO, Int]).flatMap { q =>
    
      Stream(
        Stream(1,2,3).map(Some(_)).through(q.enqueue), 
        Stream.sleep(1.seconds) >> Stream.eval(q.enqueue1(None)) 
      ).parJoin(2).flatMap { _ =>
        q.dequeue 
      }
    }
    
    • you create a queue in Stream
    • then for each element of this stream (for each Queue) you create a smaller Stream
      • you are enqueuing Some 1, 2, 3 lazily in one stream
      • you enqueue None an another
      • you non-deterministically combine both streams, so you don't have a strong guarantees when sequence of Some will be interrupted by None, in your tests the circumstances made it so that it happened after evaluating 2 elements of the first stream

    You basic error is using Stream[IO, *] and IO interchangably: when you make a flatMap on Stream the next line is NOT evaluated after all elements of the previous line has been evaluated, but after every single element of the previous line has been evaluated. After all Stream is (lazy, side-effecting) collection so in for-comprehension it behaves more like a (Lazy)List when it comes to order of operations.

    If you didn't evaluate everything is Stream it would work as you expect:

    val createQueue: IO[NoneTerminatedQueue[IO, Int]] = Queue.noneTerminated[IO, Int]
    
    def enqueueValues(queue: NoneTerminatedQueue[IO, Int]) =
      Stream(1,2,3).map(Some(_)).through(queue.enqueue) ++ Stream.eval(queue.enqueue1(None))
    
    def dequeueValues(queue: NoneTerminatedQueue[IO, Int]) =
      queue.dequeue
    
    // these are IO, not Stream[IO, *] !!!
    val io = for {
      queue <- createQueue
      _ <- enqueueValues(queue).compile.drain.start
      result <- dequeueValues(queue).compile.toList
    } yield result
    
    io.runUnsafeSync