I'm trying to pass the elements of a stream from a queue to another sequentially, in a way that a queue can receive a single element at a time. The result from pushing to a queue should be pulled and passed to the next queue for computation. The code is not showing any execution error, but It stops for the first queue (pulling and pushing). Can anyone explain what am I missing .??
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
implicit timer: Timer[IO]
) {
val streamData = Stream(1, 2, 3).covary[IO]
val scheduledStream = Stream.fixedDelay(10.seconds) >> streamData
def storeInQueueFirst: Stream[IO, Unit] = {
scheduledStream
.evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
.metered(Random.between(1, 20).seconds)
.through(q1.enqueue)
}
def getFromQueueFirst: Stream[IO, Int] = {
q1.dequeue
.evalTap(n => IO.delay(println(s"Pulling from queue First $n")))
}
def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
s.map { n =>
n.toString
}
.metered(Random.between(1, 20).seconds)
.evalTap(n => IO.delay(println(s"Pushing to queue second $n")))
.through(q2.enqueue)
}
def getFromQueueSecond: Stream[IO, Unit] = {
q2.dequeue
.evalMap(_ => IO.delay(println("Pulling element from queue second")))
}
}
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
_ <- b.storeInQueueFirst.compile.drain.start
a <- b.getFromQueueFirst.compile.lastOrError
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
_ <- b.getFromQueueSecond.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
The output is given as:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
The expected output is :
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
Pushing to queue second 1
Pushing to queue second 2
Pushing to queue second 3
Pulling from queue second 1
Pulling from queue second 2
Pulling from queue second 3
You seem to not understand some things about how streams work.
When you run stream.drain
it means that this thread/fiber/whatver which will be handling this stream will wait until stream terminates.
So when you have
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
you are effectively blocking and the next operation won't start until stream in this step reads all elements, considers itself drain and closes. (And once stream is closed there is not way to make it run again, you have to create a new one).
If you want to:
you should either:
.start
is used to start computation in a new fiber)Some(value)
as long as you would like for queue to stay open, and None
to finally close it)Only second way would let you have such sorted output, but it would also kind of defeat the purpose of using streams as it would be just push elements to queue - close stream - pull elements - close stream - push elements - close stream - pull elements - close stream, so it would make sense only as an exercise. If you went the first way it would be something like
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
// q1 is started in a separate fiber to not block the next line
_ <- b.storeInQueueFirst.compile.drain.start
// output from q1 is redirected to q2 and stream is run in a separate fiber
// to not not block running the next operation
_ <- b.storeInQueueSecond(b.getFromQueueFirst).compile.drain.start
// stream from q2 is blocking until stream completes...
_ <- b.getFromQueueSecond.compile.drain
// ... so that we won't return ExitCode if program is still supposed to run
} yield ExitCode.Success
}
}
Pushing 1 to Queue First
Pushing 2 to Queue First
Pulling from queue First 1
Pushing to queue second 1
Pulling element from queue second
Pulling from queue First 2
Pushing to queue second 2
Pulling element from queue second
...