According to the Project Reactor documentation regarding processors:
direct (DirectProcessor and UnicastProcessor): These processors can only push data through direct user action (calling their Sink's methods directly).
synchronous (EmitterProcessor and ReplayProcessor): These processors can push data both through user action and by subscribing to an upstream Publisher and synchronously draining it.
UnicastProcessor
shouldn't be able to subscribe to an upstream Publisher
. There documentation offers an example of the direct user Sink invocation:
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
However I have tried subscribing directly a UnicastProcessor
to a Publisher
and it works. This shouldn't be possible as stated in the documentation. Is the doc wrong of am I missing something?
In the following example, I'm subscribing the UnicastProcessor
to an upstream Flux
without any problem:
val latch = CountDownLatch(20)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
val connectableFlux = numberGenerator.subscribeWith(processor)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
}
latch.await()
Log:
12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]
Yes it seems this aspect of the documentation is outdated. Even DirectProcessor
can be used as a Subscriber
and propagate signals to its own subscribers.
NB: You used an EmitterProcessor
in your snippet, but it still behaves the same with UnicastProcessor
.