With Radisson, simply receiving events is enough to add new items to the list. To do this, you need to do the following:
object Test extends App {
val redisson = Redisson.create()
val events = redisson.getQueue[String]("minio_events", new StringCodec())
events.addListener(new ListAddListener() {
override def onListAdd(o: String): Unit = println(o)
})
}
Difficulties begin when it needs to be wrapped in ZIO. How can I wrap this event in ZIO or ZStream to start the chain of event processing?
It looks like Redisson supports converting the RedissonClient
into a reactive streams client which there is a zio-interop for. But if you just want to work directly with the java interface I think you can do something like this (note I haven't actually tested this):
object Test extends zio.App {
def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] =
(for {
// Construct the client in the scope of the stream so it shuts down when done
c <- ZStream.managed(ZManaged.makeEffect(Redisson.create())(_.shutdown()))
// Variant of effectAsync* that lets you specify an interrupter
s <- ZStream.effectAsyncInterrupt[Any, Nothing, String] { k =>
val queue = c.getQueue[String]("", new StringCodec())
val listenerId = queue.addListener(new ListAddListener {
// Invoke the callback by passing in a ZIO with a single chunk
def onListAdd(name: String): Unit = k(ZIO.succeed(Chunk.single(name)))
})
// Return a cancellation handler.
Left(UIO(queue.removeListener(listenerId)))
}
} { zio.console.putStrLn(s) }).exitCode
}