Search code examples
javaproject-reactorreactor

Project Reactor split a publisher into two with a minimum of two subscribers


How can you split a publisher into two in Reactor, so then two streams of data exist that are identical to be worked on downstream in different flows?

So I can map over each stream and subscribe individually to each stream.

I cannot see anything in the API that suggests this is on the API.

I need to wait until both subscribers are up and ready before publishing.


Solution

  • Thanks for the input was not thinking straight, of course just have multiple subscribers as so:

      val flux = Flux.just("MyData1", "MyData2", "MyData3");
    
      flux.doOnNext { println("Subscribing one$it") }.subscribe()
    
      flux.doOnNext { println("Subscribing Two$it") }.subscribe()
    

    Will output:

    Subscribing oneMyData1
    Subscribing oneMyData2
    Subscribing oneMyData3
    Subscribing TwoMyData1
    Subscribing TwoMyData2
    Subscribing TwoMyData3
    

    As suggested above there is Share but this API does not allow to set a minimum number of subscribers so its better to call the underneath functions as so in my case I want to wait until we have two subscribers. The docs states

    a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.

    val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)
    

    This results in the following output to ensure messages are not missed if there is a delay in starting the second subscriber.

    Subscribing oneMyData1
    Subscribing TwoMyData1
    Subscribing oneMyData2
    Subscribing TwoMyData2
    Subscribing oneMyData3
    Subscribing TwoMyData3