Search code examples
project-reactorreactive-streams

Flux.zip method not emitting all elements


I am working with Reactive Stream and Publishers (Mono and Flux), and combining the two publishers using the zip and zipWith method of Flux as follows:

Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");
Flux.zip(flux1, flux2,
                    (itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " )
            .subscribe(System.out::print);

and here is the output:

[  {1} : |A| ] [ {2} : |B|  ] [ {3} : |C|  ] 

AS flux1 has four elements and flux2 has three elements, the forth element in flux1 gets lost. And when i tried to print the logs of the flux, there is no information about what happened to the forth element.

Here is the statement for printing logs:

Flux.zip(flux1, flux2,
                (itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " ).log()
        .subscribe(System.out::print);

and here is the console logs with using log method:

[info] onSubscribe(FluxZip.ZipCoordinator)
[info] request(unbounded)
[info] onNext([  {1} : |A| ] )
[  {1} : |A| ] [info] onNext([ {2} : |B|  ] )
[ {2} : |B|  ] [info] onNext([ {3} : |C|  ] )
[ {3} : |C|  ] [info] onComplete()

From the documentation of zip method, i got

The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

But in my case, it did not logged any error and did not logged any message about the lost element.

How can i get the information about the lost element?

Please suggest.


Solution

  • zip/zipWith will output as many pairs as there are elements in the shortest Flux. It cancels the longer Flux upon termination of the smallest one, which should be visible if you put the log() on the source Flux instead of the zipped one.

    This is demonstrated by this snippet (which is tuned to show 1-by-1 requests and run as a unit test, hence the hide()/zipWith(..., 1) and blockLast()):

    @Test
    public void test() {
        Flux<Integer> flux1 = Flux.range(1, 4).hide().log("\tFLUX 1");
        Flux<Integer> flux2 = Flux.range(10, 2).hide().log("\tFlux 2");
    
        flux1.zipWith(flux2, 1)
            .log("zipped")
            .blockLast();
    }
    

    Which outputs:

    11:57:21.072 [main] INFO  zipped - onSubscribe(FluxZip.ZipCoordinator)
    11:57:21.077 [main] INFO  zipped - request(unbounded)
    11:57:21.079 [main] INFO    FLUX 1 - onSubscribe(FluxHide.HideSubscriber)
    11:57:21.079 [main] INFO    FLUX 1 - request(1)
    11:57:21.079 [main] INFO    FLUX 1 - onNext(1)
    11:57:21.079 [main] INFO    Flux 2 - onSubscribe(FluxHide.HideSubscriber)
    11:57:21.080 [main] INFO    Flux 2 - request(1)
    11:57:21.080 [main] INFO    Flux 2 - onNext(10)
    11:57:21.080 [main] INFO  zipped - onNext([1,10])
    11:57:21.080 [main] INFO    FLUX 1 - request(1)
    11:57:21.080 [main] INFO    FLUX 1 - onNext(2)
    11:57:21.080 [main] INFO    Flux 2 - request(1)
    11:57:21.080 [main] INFO    Flux 2 - onNext(11)
    11:57:21.080 [main] INFO  zipped - onNext([2,11])
    11:57:21.080 [main] INFO    FLUX 1 - request(1)
    11:57:21.080 [main] INFO    FLUX 1 - onNext(3)
    11:57:21.080 [main] INFO    Flux 2 - request(1)
    11:57:21.080 [main] INFO    Flux 2 - onComplete()
    11:57:21.081 [main] INFO    FLUX 1 - cancel() <----- HERE
    11:57:21.081 [main] INFO    Flux 2 - cancel()
    11:57:21.081 [main] INFO  zipped - onComplete()
    

    This is the "until any of the sources completes" part.