I am actually using Scala but the question is generic to all Rx and streaming frameworks.
My use case is that I have a generated observable (thus cold) and I want multiple consumers to consume the exact same values in parallel, but I expect them to have significantly different throughput.
What I need could be done by broadcasting an observable with replay, but I see that the common policy of replay with a max buffer size is to drop elements from the buffer on overflow (that are then lost for the slowest consumers) instead of back-pressuring the producer. It makes sense if you treat all broadcasted observables as hot, but, in my case, I know that it actually is cold and can be back-pressured.
Is there some way to achieve this in any of the JVM reactive stream compliant frameworks?
Many thanks!
RxJava supports this via the publish
operator which coordinates requests from individual consumers, that is, it requests at a fixed rate as fast as the slowest consumer requests. Unfortunately, there is no RxScala 2 currently and only RxJava 2 supports the Reactive-Streams specification, therefore, you may have a bit of an inconvenience turning this into Scala:
Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f ->
Flowable.mergeArray(
f.observeOn(Schedulers.computation()).map(v -> v * v),
f.observeOn(Schedulers.computation()).map(v -> v * v * v)
)
)
.blockingSubscribe(System.out::println);
The alternative is to use a ConnectableObservable
and connect manually once all consumers have subscribed:
ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
.publish();
co.observeOn(Schedulers.computation()).map(v -> v * v)
.subscribe(System.out::println);
co.connect();