I am learning reactive streams and working on Publishers(Flux), and working for the transformation of Flux. For this i got compose and transform methods.
Here is my code:
private static void composeStream() {
System.out.println("*********Calling composeStream************");
Function<Flux<String>, Flux<String>> alterMap = f -> {
return f.filter(color -> !color.equals("ram"))
.map(String::toUpperCase);
};
Flux<String> compose = Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
.doOnNext(System.out::println)
.compose(alterMap);
compose.subscribe(d -> System.out.println("Subscriber to Composed AlterMap :"+d));
System.out.println("-------------------------------------");
}
private static void transformStream() {
System.out.println("*********Calling transformStream************");
Function<Flux<String>, Flux<String>> alterMap = f -> f.filter(color -> !color.equals("ram"))
.map(String::toUpperCase);
Flux.fromIterable(Arrays.asList("ram", "sam", "kam", "dam"))
.doOnNext(System.out::println)
.transform(alterMap)
.subscribe(d -> System.out.println("Subscriber to Transformed AlterMap: "+d));
System.out.println("-------------------------------------");
}
and here is the output, which is same for both the cases:
*********Calling transformStream************
ram
sam
Subscriber to Transformed AlterMap: SAM
kam
Subscriber to Transformed AlterMap: KAM
dam
Subscriber to Transformed AlterMap: DAM
-------------------------------------
*********Calling composeStream************
ram
sam
Subscriber to Composed AlterMap :SAM
kam
Subscriber to Composed AlterMap :KAM
dam
Subscriber to Composed AlterMap :DAM
-------------------------------------
What is the difference between the two? Please suggest
Transform this
Flux
in order to generate a targetFlux
. UnlikeFlux#compose(Function)
, the provided function is executed as part of assembly.
If we will write a small test like next:
int[] counter = new int[1];
Function transformer = f -> {
counter[0]++;
return f;
}
Flux flux = flux Flux.just("")
.transform(transformer);
System.out.println(counter[0]);
flux.subscribe();
flux.subscribe();
flux.subscribe();
System.out.println(counter[0]);
In the output we will observe next result:
1
1
Which means that transform function will be executed once during assembling of the pipe, so in other words transformation function will be executed eagerly.
In turn, with .compose
we will get next behavior for the same code
int[] counter = new int[1];
Function transformer = f -> {
counter[0]++;
return f;
}
Flux flux = flux Flux.just("")
.compose(transformer);
System.out.println(counter[0]);
flux.subscribe();
flux.subscribe();
flux.subscribe();
System.out.println(counter[0]);
And output
0
3
Which means that for each subscriber transformation function will be executed separately, and we may consider that kind of execution as lazy