Search code examples
publish-subscribespring-webfluxreactive-streams

Error when combining two Flux using zipWith


I am learning the reactive streams and trying to combine the two Flux as follows;

        List<Integer> elems = new ArrayList<>();
        Flux.just(10,20,30,40)
            .log()
            .map(x -> x * 2)
            .zipWith(Flux.range(0, Integer.MAX_VALUE), 
                    (two, one) -> String.format("First  : %d, Second : %d \n", one, two))
            .subscribe(elems::add);

and when calling subscribe, i got the following error:

Multiple markers at this line
    - The method subscribe(Consumer<? super String>) in the type Flux<String> is not applicable for the arguments 
     (elems::add)
    - The type List<Integer> does not define add(String) that is applicable here

and i got the following suggestions to solve the issues:

enter image description here

But none of these alternative worked. Any suggestions, how to solve this issue?


Solution

  • Sometimes method references makes you overlook the obvious. I have re-written your function, but with anonymous class.

      List<Integer> elems = new ArrayList<>();
        Flux.just(10,20,30,40)
            .log()
            .map(x -> x * 2)
            .zipWith(Flux.range(0, Integer.MAX_VALUE),
                (two, one) -> String.format("First  : %d, Second : %d \n", one, two))
            .subscribe(new Consumer<String>() {
              @Override
              public void accept(String s) {
    
              }
            });
    

    I have used code completion from my IDE(intellij) to create this anonymous class. As you can see the input to this consumer is a String, which is coming from

    String.format("First  : %d, Second : %d \n", one, two)
    

    So it is complaining that you cannot add a String to a List<Integer> which is what you are trying to do using elems:add