Search code examples
javaproject-reactor

Project reactor upgrade from v2 to v3?


Is there really no official project reactor upgrade guide from v2 to v3?

I've searched far and wide, even using waybackmachine, but I've had no luck finding any upgrade path documentation from v2 to v3 (v2.0.6.RELEASE to v3.5.15, to be more precise).

Now, I'm aware that I'm coming in a bit late with this question, given that the first v3 came out somewhere in 2017, but the current version is still v3, so I guess it shouldn't be too late, right?

The best resource I've found so far is this GH issue comment, but I have several other questions.

Specifically, what do I use in v3 instead of these v2 classes:

  1. reactor.core.Dispatcher
  2. reactor.core.dispatch.SynchronousDispatcher
  3. reactor.core.dispatch.RingBufferDispatcher
  4. reactor.core.processor.RingBufferProcessor (the comment I linked to above says reactor.core.publisher.TopicProcessor, but that too has since been removed)
  5. reactor.rx.action.Action
  6. reactor.rx.stream.GroupedStream (I assume reactor.core.publisher.GroupedFlux, since Stream -> Flux, but what do I use instead of GroupedStream.lift?

Thanks.


Solution

  • OK, let me answer each of the items from my question with a separate answer (for readability reasons) in the hope that it might help others in the future.

    1. Dispatcher -> Scheduler
    2. SynchronousDispatcher -> Schedulers.immediate()

    Old:

    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.function.Consumer;
    import java.util.function.Predicate;
    
    import reactor.core.Dispatcher;
    import reactor.core.dispatch.SynchronousDispatcher;
    
    public final class ReactiveEventRouter {
      // dropping unimportant, unchanged parts and field modifiers for brevity
      SynchronousDispatcher syncDispatcher = new SynchronousDispatcher();
      CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
      reactor.fn.Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);
    
      public <E> void tryRoute(final Object event, Subscription<E> 
    subscription, Dispatcher dispatcher) {
        if (subscription.test(event)) {
          dispatcher.dispatch(event, subscription::accept, errorConsumer);
        }
      }
    
      public <E> void tryRoute(final Object event, Subscription<E> subscription) {
        tryRoute(event, subscription, syncDispatcher);
      }
    
      public void route(final Object event) {
        route(event, syncDispatcher);
      }
    
      public void route(final Object event, Dispatcher dispatcher) {
        subscriptions.forEach(subscription -> {
          if (subscription.test(event)) {
            dispatcher.dispatch(event, subscription::accept, errorConsumer);
          }
        });
      }
    }
    

    New:

    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.function.Consumer;
    import java.util.function.Predicate;
    
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Scheduler;
    import reactor.core.scheduler.Schedulers;
    
    public final class ReactiveEventRouter {
      Scheduler immediateScheduler = Schedulers.immediate();
      CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
      Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);
    
      public <E> void tryRoute(final Object event, Subscription<E> subscription, Scheduler scheduler) {
        if (subscription.test(event)) {
          Mono.just(event)
              .subscribeOn(scheduler)
              .doOnError(errorConsumer)
              .doOnNext(subscription::accept)
              .subscribe();
        }
      }
    
      public <E> void tryRoute(final Object event, Subscription<E> subscription) {
        tryRoute(event, subscription, immediateScheduler);
      }
    
      public void route(final Object event) {
        route(event, immediateScheduler);
      }
    
      public void route(final Object event, Scheduler scheduler) {
        subscriptions.forEach(subscription -> {
          if (subscription.test(event)) {
            Mono.just(event)
                .subscribeOn(scheduler)
                .doOnError(errorConsumer)
                .doOnNext(subscription::accept)
                .subscribe();
          }
        });
      }
    }
    

    JFYI: a simple inner class ReactiveEventRouter.Subscription hasn't changed, but I'm including it here to avoid confusion with org.reactivestreams.Subscription:

    public static class Subscription<E> {
      final Predicate<Object> matcher;
      final Consumer<E> consumer; // this is java.util.function.Consumer
    
      Subscription(Predicate<Object> matcher, Consumer<E> consumer) {
        this.matcher = matcher;
        this.consumer = consumer;
      }
    
      boolean test(Object obj) {
        return matcher.test(obj);
      }
    
      void accept(Object evt) {
        consumer.accept((E) evt);
      }
    }