Search code examples
spring-webfluxproject-reactorreactor

Dynamic set of publishers all emiting through the same flux


I am trying to build a kind of hub service that can emit through a hot Flux (output) but you can also register/unregister Flux producers/publishers (input)

I know I can do something like:

    class Hub<T> {
        /**
         * @return unregister function
         */
        Function<Void, Void> registerProducer(final Flux<T> flux) { ... }

        Disposable subscribe(Consumer<? super T> consumer) {
            if (out == null) { 
                // obviously this will not work!
                out = Flux.merge(producer1, producer2, ...).share();
            }
            return out;
        }
    }

... but as these "producers" are registered and unregistered, how do I add a new flux source to the existing subscribed to flux? or remove a unregistered source from it?

TIA!


Solution

  • Flux is immutable by design, so as you've implied in the question, there's no way to just "update" an existing Flux in situ.

    Usually I'd recommend avoiding using a Processor directly. However, this is one of the (rare-ish) cases where a Processor is probably the only sane option, since you essentially want to be publishing elements dynamically based on the producers that you're registering. Something similar to:

    class Hub<T> {
    
        private final FluxProcessor<T, T> processor;
        private final FluxSink<T> sink;
    
        public Hub() {
            this.processor = DirectProcessor.<T>create().serialize();
            this.sink = processor.sink();
        }
    
        public Disposable registerProducer(Flux<T> flux) {
            return flux.subscribe(sink::next);
        }
    
        public Flux<T> read() {
            return processor;
        }
    }
    

    If you want to remove a producer, then you can keep track of the Disposable returned from registerProducer() and call dispose() on it when done.