Search code examples
javareactive-programmingproject-reactorreactorreactive-streams

Project Reactor: Do I need a Processor?


I'm trying to design a pipeline framework on top of Reactor.

At each stage (not considering the first and the last), we have tasks that transform an object (i.e. string to its length or a url to its HTML content etc). Here's an example:

enter image description here

You can see that the middle layer has 3 tasks and each task transforms an X object to a Y object (by the way, it's always a fully connected layers)

My Question/Dilemma: My first thought was that all I need is to Flux.merge() and then, to connect it to each Subscriber. For example:

Flux<X> source = Flux.merge(x1Flux, x2Flux)  
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)

Another option is to put a Processor (TopicProcessor?) that will act as a middleware (like in the pub-sub pattern)

I'm lacking the understanding of which solution fits best to my problem. Logically it's the same, but what are the practical implications of each architecture?

Thanks!


Solution

  • My general approach here would be to use ConnectableFlux so as to delay the publishing until you have your entire pipeline setup, and then call connect() on each flux once you've set up the pipeline.

    You could use a processor, but I'd advise avoiding that wherever possible.

    The general gist (not checked for syntax) would be something akin to:

    ConnectableFlux<String> x1 = Flux.just("x1").publish();
    ConnectableFlux<String> x2 = Flux.just("x2").publish();
    
    ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
    ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
    ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();
    
    ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();
    
    x1.connect();
    x2.connect();
    y1.connect();
    //...etc.
    

    Also note that you may wish to use concat() or mergeSequential() rather than merge(), depending on your use case (merge() will subscribe to publishers eagerly, concat() will not, and mergeSequential() will merge in the order received, potentially interleaving the values.)