Search code examples
scalaqueuesequentialfs2

My fs2 scala program is not executing as expected


I'm trying to pass the elements of a stream from a queue to another sequentially, in a way that a queue can receive a single element at a time. The result from pushing to a queue should be pulled and passed to the next queue for computation. The code is not showing any execution error, but It stops for the first queue (pulling and pushing). Can anyone explain what am I missing .??

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random

class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
  implicit timer: Timer[IO]
) {
  val streamData = Stream(1, 2, 3).covary[IO]
  val scheduledStream = Stream.fixedDelay(10.seconds) >> streamData

  def storeInQueueFirst: Stream[IO, Unit] = {
    scheduledStream
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
      .metered(Random.between(1, 20).seconds)
      .through(q1.enqueue)

  }
  def getFromQueueFirst: Stream[IO, Int] = {
    q1.dequeue
      .evalTap(n => IO.delay(println(s"Pulling from queue First $n")))

  }

  def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
    s.map { n =>
        n.toString
      }
      .metered(Random.between(1, 20).seconds)
      .evalTap(n => IO.delay(println(s"Pushing to queue second $n")))
      .through(q2.enqueue)
  }

  def getFromQueueSecond: Stream[IO, Unit] = {
    q2.dequeue
      .evalMap(_ => IO.delay(println("Pulling element from queue second")))
  }

}

object FiveTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q1 <- Queue.bounded[IO, Int](10)
      q2 <- Queue.bounded[IO, String](10)

      b = new StreamTypeIntToDouble(q1, q2)
      _ <- b.storeInQueueFirst.compile.drain.start
      a <- b.getFromQueueFirst.compile.lastOrError
      _ <- b.storeInQueueSecond(Stream(a)).compile.drain
      _ <- b.getFromQueueSecond.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}

The output is given as:

Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3

The expected output is :

Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
Pushing to queue second 1
Pushing to queue second 2
Pushing to queue second 3
Pulling from queue second 1
Pulling from queue second 2
Pulling from queue second 3

Solution

  • You seem to not understand some things about how streams work.

    When you run stream.drain it means that this thread/fiber/whatver which will be handling this stream will wait until stream terminates.

    So when you have

    _ <- b.storeInQueueSecond(Stream(a)).compile.drain
    

    you are effectively blocking and the next operation won't start until stream in this step reads all elements, considers itself drain and closes. (And once stream is closed there is not way to make it run again, you have to create a new one).

    If you want to:

    • put things on queue 1
    • subscribe on queue 1 and push each element to queue 2
    • subscribe to queue 2

    you should either:

    • perform these steps in parallel - this can be achieved using Fibers (.start is used to start computation in a new fiber)
    • write queue in such a way that the stream would close when all elements are received and streamed (the condition when all elements are received would have to defined explicitly e.g. None-terminated Queue/Stream - where you would send Some(value) as long as you would like for queue to stay open, and None to finally close it)

    Only second way would let you have such sorted output, but it would also kind of defeat the purpose of using streams as it would be just push elements to queue - close stream - pull elements - close stream - push elements - close stream - pull elements - close stream, so it would make sense only as an exercise. If you went the first way it would be something like

    object FiveTest extends IOApp {
      override def run(args: List[String]): IO[ExitCode] = {
        for {
          q1 <- Queue.bounded[IO, Int](10)
          q2 <- Queue.bounded[IO, String](10)
    
          b = new StreamTypeIntToDouble(q1, q2)
    
          // q1 is started in a separate fiber to not block the next line
          _ <- b.storeInQueueFirst.compile.drain.start
          // output from q1 is redirected to q2 and stream is run in a separate fiber
          // to not not block running the next operation
          _ <- b.storeInQueueSecond(b.getFromQueueFirst).compile.drain.start
          // stream from q2 is blocking until stream completes...
          _ <- b.getFromQueueSecond.compile.drain
          // ... so that we won't return ExitCode if program is still supposed to run
        } yield ExitCode.Success
      }
    }
    
    Pushing 1 to Queue First
    Pushing 2 to Queue First
    Pulling from queue First 1
    Pushing to queue second 1
    Pulling element from queue second
    Pulling from queue First 2
    Pushing to queue second 2
    Pulling element from queue second
    ...