I am learning the reactive streams and trying to combine the two Flux as follows;
List<Integer> elems = new ArrayList<>();
Flux.just(10,20,30,40)
.log()
.map(x -> x * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(two, one) -> String.format("First : %d, Second : %d \n", one, two))
.subscribe(elems::add);
and when calling subscribe, i got the following error:
Multiple markers at this line
- The method subscribe(Consumer<? super String>) in the type Flux<String> is not applicable for the arguments
(elems::add)
- The type List<Integer> does not define add(String) that is applicable here
and i got the following suggestions to solve the issues:
But none of these alternative worked. Any suggestions, how to solve this issue?
Sometimes method references makes you overlook the obvious. I have re-written your function, but with anonymous class.
List<Integer> elems = new ArrayList<>();
Flux.just(10,20,30,40)
.log()
.map(x -> x * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(two, one) -> String.format("First : %d, Second : %d \n", one, two))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
}
});
I have used code completion from my IDE(intellij) to create this anonymous class. As you can see the input to this consumer is a String
, which is coming from
String.format("First : %d, Second : %d \n", one, two)
So it is complaining that you cannot add a String
to a List<Integer>
which is what you are trying to do using elems:add