Search code examples
scalaredisredissonzio

How to listen redis list event with redisson and ZIO


With Radisson, simply receiving events is enough to add new items to the list. To do this, you need to do the following:

object Test extends App {
  val redisson = Redisson.create()
  val events = redisson.getQueue[String]("minio_events", new StringCodec())
  events.addListener(new ListAddListener() {
    override def onListAdd(o: String): Unit = println(o)
  })
}

Difficulties begin when it needs to be wrapped in ZIO. How can I wrap this event in ZIO or ZStream to start the chain of event processing?


Solution

  • It looks like Redisson supports converting the RedissonClient into a reactive streams client which there is a zio-interop for. But if you just want to work directly with the java interface I think you can do something like this (note I haven't actually tested this):

    object Test extends zio.App {
       def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = 
           (for {
             // Construct the client in the scope of the stream so it shuts down when done
             c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))
    
             // Variant of effectAsync* that lets you specify an interrupter
             s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
               val queue = c.getQueue[String]("", new StringCodec())
               val listenerId = queue.addListener(new ListAddListener {
    
                 // Invoke the callback by passing in a ZIO with a single chunk
                 def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
               })
    
               // Return a cancellation handler.
               Left(UIO(queue.removeListener(listenerId)))
            }
          } { zio.console.putStrLn(s) }).exitCode
    }