Search code examples
scalaziozio-streams

How to read messages from ZHub via ZStream?


I am new to ZHub and ZStream and wanted to familiarize myself with their APIs.

Unfortnuately, I could not make this simple example work:

for
    hub <- Hub.bounded[String](4)
    stream = ZStream.fromHub(hub)
    _ <- hub.publish("Hello")
    _ <- hub.publish("World")
    collected <- stream.runCollect
    _ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
    ()

This program does not terminate, I suspect, because I am trying to collect an infinite stream. I have also tried to print the messages using stream.tap(...) or to shut down the hub. Nothing has helped.

What am I missing here? Any help is appreciated, thanks.


Solution

  • @adamgfraser kindly provided a working example on GitHub:

    import zio._
    import zio.stream._
    
    object Example extends App {
    
      def run(args: List[String]): URIO[ZEnv, ExitCode] =
        for {
          promise <- Promise.make[Nothing, Unit]
          hub     <- Hub.bounded[String](2)
          stream = ZStream.managed(hub.subscribe).flatMap { queue =>
                     ZStream.fromEffect(promise.succeed(())) *>
                       ZStream.fromQueue(queue)
                   }
          fiber     <- stream.take(2).runCollect.fork
          _         <- promise.await
          _         <- hub.publish("Hello")
          _         <- hub.publish("World")
          collected <- fiber.join
          _         <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
        } yield ExitCode.success
    }
    

    My mistake was to publish values to the hub before waiting for the subscription to complete.