Search code examples
springreactive-programmingproject-reactor

DirectProcessor and UnicastProcessor can subscribe to upstream Publisher when they shouldn't. Why?


According to the Project Reactor documentation regarding processors:

direct (DirectProcessor and UnicastProcessor): These processors can only push data through direct user action (calling their Sink's methods directly).

synchronous (EmitterProcessor and ReplayProcessor): These processors can push data both through user action and by subscribing to an upstream Publisher and synchronously draining it.

UnicastProcessor shouldn't be able to subscribe to an upstream Publisher. There documentation offers an example of the direct user Sink invocation:

UnicastProcessor<String> hotSource = UnicastProcessor.create();

Flux<String> hotFlux = hotSource.publish()
                                .autoConnect()
                                .map(String::toUpperCase);

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");

However I have tried subscribing directly a UnicastProcessor to a Publisher and it works. This shouldn't be possible as stated in the documentation. Is the doc wrong of am I missing something?

In the following example, I'm subscribing the UnicastProcessor to an upstream Flux without any problem:

    val latch = CountDownLatch(20)

    val numberGenerator: Flux<Long> = counter(1000)
    val processor = UnicastProcessor.create<Long>()

    val connectableFlux = numberGenerator.subscribeWith(processor)

    connectableFlux.subscribe {
        logger.info("Element [{}]", it)
    }

    latch.await()

Log:

12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]

Solution

  • Yes it seems this aspect of the documentation is outdated. Even DirectProcessor can be used as a Subscriber and propagate signals to its own subscribers.

    NB: You used an EmitterProcessor in your snippet, but it still behaves the same with UnicastProcessor.