Search code examples
scalaqueuefs2

why is my code not returning anything ? Scala fs2


The program permits pushing Mapping Ints to Double and identifying the exit time from the queue. The program is not showing any error but It is not printing anything. What am I missing ?

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      _ <- b.storeInQueue.compile.drain

    } yield ()
    program.as(ExitCode.Success)
  }
}

Solution

  • IO is evaluated lazily - for something to get executed it has to be a part of expression that created the final IO value.

    Here:

      def storeInQueue: Stream[IO, Unit] = {
        scheduledStream ... // no side effects are run when we create this!
    
        q1.dequeue ... // not using scheduledStream
      }
    

    value scheduledStream is not used at all, so it isn't "a part" of value returned from storeInQueue so when IOApp turns IO value into computations, the recipe for your program doesn't contain the part where messages are pushed to queue, so the queue is always empty.

    The part which subscribes to queue works, but since nothing ever lands on queue it keeps on waiting for items that will never arrive.

    You would have to start both streams by "making them part of one IO value", e.g. like this:

    class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {
    
      val streamData = Stream.emit(1)
      val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
    
      def storeInQueue =
        scheduledStream
          .map { n =>
            val entryTime =
              timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
            (n.toDouble, entryTime)
          }
          .through(q1.enqueue)
          .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
    
      def takeFromQueue =
        q1.dequeue
          .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
          .map { n =>
            val exitTime =
              timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
            (n._1, exitTime)
          }
          .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
      }
    }
    
    object Five2 extends IOApp {
    
      override def run(args: List[String]): IO[ExitCode] = {
        val program = for {
          q <- Queue.bounded[IO, (Double, IO[Long])](1)
          b = new Tst(q)
          pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
          pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
        } yield ()
        program.as(ExitCode.Success)
      }
    }