Search code examples
javaspringrx-javareactive-programmingproject-reactor

Upgrading io.projectreactor version from 2.0.x to 3.0.4 - Using Spring framework


I have an issue while trying to make the upgrade.

Currently i'm using version 2.0.x, and in particular -

reactor.bus
reactor.rx.Stream
reactor.rx.Streams
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

I'm using maven, and i have a single dependency regarding 'projectreactor' -

<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>

When upgrading to version 3.0.4.RELEASE, in order to keep using all the things i used before, i need to explicitly import -

<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>

And

<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>

but i'm still missing

reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

and i'm not sure what to do.


Solution

  • reactor.rx.Stream -> reactor.core.publisher.Flux
    reactor.rx.Streams -> reactor.core.publisher.Flux
    reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor
    reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor
    reactor.fn.Consumer -> java.util.function.Consumer (Java 8)
    

    There is no new spring module since spring 5 directly includes Reactor support with these new types.

    As for reactor-bus : By design now all stream routes (Flux/Mono chains) are typed, so dynamic routing is not part of our features yet. Still there are alternative in a typed way, for instance :

    ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
    Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
    Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
    Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev));
    
    interest1.subscribe(doSomethingForInterest1);
    interest2.subscribe(doSomethingForInterest2);
    interest1_2.subscribe(doSomethingForInterest1_2);
    
    rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
    rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
    rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react
    rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor
    
    //shutdown/cleanup/close
    rp.onComplete();
    

    I have found this on github which seems to fit your needs