During the process of implementing GraphQL subscriptions I came up with a poor man's solution to pub/sub with custom topics that looks as follows:
// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
new ConcurrentMultiMap<>();
public Publisher<String> subscribeToMessages(final Long id) {
return Flux.create(
newSubscriber ->
subscribers.add(
id,
newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
OverflowStrategy.LATEST);
}
public void publish(final Long id, final String message) {
Optional.of(id)
.map(subscribers::get)
.ifPresent(
subscribers ->
subscribers.forEach(
subscriber -> subscriber.next(message)));
}
Moving onward I wanted to replace this implementation with a Redis-backed solution using the RedissonReactiveClient
as I stumbled upon the RTopicReactive::getMessages
method that returns a Flux
. Basically the previous code looks now as follows:
public Publisher<String> subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class);
}
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message);
}
Unfortunately though, this doesn't work as expected. The updates are published correctly as far as I can tell (since I connected to the redis instance and SUBSCRIBE
d to it once to check the data) and even after letting my GraphQL client subscribe to the server the Mono
returned by publish
indicates that there is one subscriber, my GraphQL client. However it isn't updating its data and the network tab in the browser shows no websocket message of published data from the server. Therefore I assume that the data exposed via the Flux
in the subscribeToMessages
method is malformed or so.
Is there anything I have to keep in mind when attempting to use the getMessages
method of the topic?
You forgot to invoke subscribe on both publishers. It should be like this:
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message).doOnSuccess(res -> {
// ...
}).subscribe();
}
public void subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class).doOnNext(res -> {
// ...
}).subscribe();
}