Search code examples
javaredisproject-reactorlettuce

Subscribe to redis channel (pubsub) using lettuce reactive commands


I am using the io.lettuce.core library and I am having trouble subscribing to a channel using the RedisPubSubReactiveCommands interface.

I have a StatefulRedisPubSubConnection and an active redis cluster which I am attempting to subscribe to.

connection.sync().subscribe("channel") works fine, as does connection.async().subscribe("channel"). However, when I use the reactive 'hot observable' interface provided by lettuce like so:

connection.reactive().subscribe(channels).subscribe();
connection.reactive().observeChannels().doOnNext(this::notifyObservers).subscribe();

It will not register as a subscription action on redis. I feel like I'm following the example given in the lettuce documentation closely.

I'm programming for an interface that accepts a hot Flux Observable and I'm getting close to wrapping the sync or async connection interfaces with my own reactive wrapper and throwing them in the pipe. What am I doing wrong here?


Solution

  • In case anyone else runs into this same problem, it turns out I was passing in a Set<String> Object into a function that accepts a varargs Object... and didn't realize it was treating the entire collection as a single element instead of parsing it as a varargs array.

    I'll leave this up for others to learn from my dumb mistake.