I'm kind of struggling with the Project Reactor documentation. I have some experience in Akka Streams but right now I'm working on a project that uses Project Reactor.
I need a Reactor operator that has the ability to accept a sequence to pass the message through. It needs to behave similarly to the .via() operator in Akka Streams.
For example, say we have a sequence of: A -> B -> C and I need to inject the sequence X1 -> X2 -> X3 after the B step. So the final sequence would be A -> B -> X1 -> X2 -> X3 -> C.
Does something like this exist in Reactor?
The closes to via
in Project Reactor is the transform method.
So in Akka say you have this graph:
Source.single(10)
.map(_ * -1) //some mapping
.runWith(Sink.ignore)
and then you have this flow:
val flow = Flow[Int].map(_ * 2)
you can plug in that flow to your graph like this:
Source.single(10)
.map(_ * -1)
.via(flow)
.runWith(Sink.ignore)
The equivalent in Project Reactor would be this:
Having a graph:
Flux.just(10)
.map(x -> x * -1)
.subscribe();
and a method to transform a Flux<Integer>
into a Publisher<Integer>
:
public static class Transformers
{
public static Publisher<Integer> flow(Flux<Integer> f)
{
return f.map(x -> x * 2);
}
}
you can plug in that method into your graph like this:
Flux.just(10)
.map(x -> x * -1)
.transform(Transformers::flow)
.subscribe();
I wrote a post on this and other differences between the two APIs, perhaps you'll find it useful. This post is from 2019 and the APIs evolve. For example I mention compose
method in context of Flux
that has been renamed to transformDeferred
since I wrote this, I am not sure what else drifted since I authored the post, so be warned: Akka Streams vs Project Reactor API