How can you split a publisher into two in Reactor, so then two streams of data exist that are identical to be worked on downstream in different flows?
So I can map over each stream and subscribe individually to each stream.
I cannot see anything in the API that suggests this is on the API.
I need to wait until both subscribers are up and ready before publishing.
Thanks for the input was not thinking straight, of course just have multiple subscribers as so:
val flux = Flux.just("MyData1", "MyData2", "MyData3");
flux.doOnNext { println("Subscribing one$it") }.subscribe()
flux.doOnNext { println("Subscribing Two$it") }.subscribe()
Will output:
Subscribing oneMyData1
Subscribing oneMyData2
Subscribing oneMyData3
Subscribing TwoMyData1
Subscribing TwoMyData2
Subscribing TwoMyData3
As suggested above there is Share but this API does not allow to set a minimum number of subscribers so its better to call the underneath functions as so in my case I want to wait until we have two subscribers. The docs states
a Flux that upon first subscribe causes the source Flux to subscribe once, late subscribers might therefore miss items.
val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)
This results in the following output to ensure messages are not missed if there is a delay in starting the second subscriber.
Subscribing oneMyData1
Subscribing TwoMyData1
Subscribing oneMyData2
Subscribing TwoMyData2
Subscribing oneMyData3
Subscribing TwoMyData3