Search code examples
scalareactive-programmingakka-streamrx-java2monix

How to broadcast a cold observable: Replay with back-pressure?


I am actually using Scala but the question is generic to all Rx and streaming frameworks.

My use case is that I have a generated observable (thus cold) and I want multiple consumers to consume the exact same values in parallel, but I expect them to have significantly different throughput.

What I need could be done by broadcasting an observable with replay, but I see that the common policy of replay with a max buffer size is to drop elements from the buffer on overflow (that are then lost for the slowest consumers) instead of back-pressuring the producer. It makes sense if you treat all broadcasted observables as hot, but, in my case, I know that it actually is cold and can be back-pressured.

Is there some way to achieve this in any of the JVM reactive stream compliant frameworks?

Many thanks!


Solution

  • RxJava supports this via the publish operator which coordinates requests from individual consumers, that is, it requests at a fixed rate as fast as the slowest consumer requests. Unfortunately, there is no RxScala 2 currently and only RxJava 2 supports the Reactive-Streams specification, therefore, you may have a bit of an inconvenience turning this into Scala:

    Flowable.fromPublisher(Flowable.range(1, 1000))
    .publish(f -> 
        Flowable.mergeArray(
            f.observeOn(Schedulers.computation()).map(v -> v * v),
            f.observeOn(Schedulers.computation()).map(v -> v * v * v)
        )
     )
     .blockingSubscribe(System.out::println);
    

    The alternative is to use a ConnectableObservable and connect manually once all consumers have subscribed:

    ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
        .publish();
    
    co.observeOn(Schedulers.computation()).map(v -> v * v)
      .subscribe(System.out::println);
    
    co.connect();