Search code examples
javascalaproject-reactorakka-stream

What is the Akka streams .via() equivalent in Project Reactor?


I'm kind of struggling with the Project Reactor documentation. I have some experience in Akka Streams but right now I'm working on a project that uses Project Reactor.

I need a Reactor operator that has the ability to accept a sequence to pass the message through. It needs to behave similarly to the .via() operator in Akka Streams.

For example, say we have a sequence of: A -> B -> C and I need to inject the sequence X1 -> X2 -> X3 after the B step. So the final sequence would be A -> B -> X1 -> X2 -> X3 -> C.

Does something like this exist in Reactor?


Solution

  • The closes to via in Project Reactor is the transform method.

    So in Akka say you have this graph:

    Source.single(10)
          .map(_ * -1) //some mapping
          .runWith(Sink.ignore)
    

    and then you have this flow:

    val flow = Flow[Int].map(_ * 2)
    

    you can plug in that flow to your graph like this:

    Source.single(10)
          .map(_ * -1)
          .via(flow)
          .runWith(Sink.ignore)
    

    The equivalent in Project Reactor would be this:

    Having a graph:

     Flux.just(10)
         .map(x -> x * -1)
         .subscribe();
    

    and a method to transform a Flux<Integer> into a Publisher<Integer>:

    public static class Transformers
    {
      public static Publisher<Integer> flow(Flux<Integer> f)
      {
        return f.map(x -> x * 2);
      }
    }
    

    you can plug in that method into your graph like this:

    Flux.just(10)
        .map(x -> x * -1)
        .transform(Transformers::flow)
        .subscribe();
    

    I wrote a post on this and other differences between the two APIs, perhaps you'll find it useful. This post is from 2019 and the APIs evolve. For example I mention compose method in context of Flux that has been renamed to transformDeferred since I wrote this, I am not sure what else drifted since I authored the post, so be warned: Akka Streams vs Project Reactor API