I am working with Reactive Stream and Publishers (Mono and Flux), and combining the two publishers using the zip and zipWith method of Flux as follows:
Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");
Flux.zip(flux1, flux2,
(itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " )
.subscribe(System.out::print);
and here is the output:
[ {1} : |A| ] [ {2} : |B| ] [ {3} : |C| ]
AS flux1 has four elements and flux2 has three elements, the forth element in flux1 gets lost. And when i tried to print the logs of the flux, there is no information about what happened to the forth element.
Here is the statement for printing logs:
Flux.zip(flux1, flux2,
(itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " ).log()
.subscribe(System.out::print);
and here is the console logs with using log method:
[info] onSubscribe(FluxZip.ZipCoordinator)
[info] request(unbounded)
[info] onNext([ {1} : |A| ] )
[ {1} : |A| ] [info] onNext([ {2} : |B| ] )
[ {2} : |B| ] [info] onNext([ {3} : |C| ] )
[ {3} : |C| ] [info] onComplete()
From the documentation of zip method, i got
The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.
But in my case, it did not logged any error and did not logged any message about the lost element.
How can i get the information about the lost element?
Please suggest.
zip
/zipWith
will output as many pairs as there are elements in the shortest Flux
. It cancels the longer Flux
upon termination of the smallest one, which should be visible if you put the log()
on the source Flux
instead of the zipped one.
This is demonstrated by this snippet (which is tuned to show 1-by-1 requests and run as a unit test, hence the hide()
/zipWith(..., 1)
and blockLast()
):
@Test
public void test() {
Flux<Integer> flux1 = Flux.range(1, 4).hide().log("\tFLUX 1");
Flux<Integer> flux2 = Flux.range(10, 2).hide().log("\tFlux 2");
flux1.zipWith(flux2, 1)
.log("zipped")
.blockLast();
}
Which outputs:
11:57:21.072 [main] INFO zipped - onSubscribe(FluxZip.ZipCoordinator)
11:57:21.077 [main] INFO zipped - request(unbounded)
11:57:21.079 [main] INFO FLUX 1 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.079 [main] INFO FLUX 1 - request(1)
11:57:21.079 [main] INFO FLUX 1 - onNext(1)
11:57:21.079 [main] INFO Flux 2 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onNext(10)
11:57:21.080 [main] INFO zipped - onNext([1,10])
11:57:21.080 [main] INFO FLUX 1 - request(1)
11:57:21.080 [main] INFO FLUX 1 - onNext(2)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onNext(11)
11:57:21.080 [main] INFO zipped - onNext([2,11])
11:57:21.080 [main] INFO FLUX 1 - request(1)
11:57:21.080 [main] INFO FLUX 1 - onNext(3)
11:57:21.080 [main] INFO Flux 2 - request(1)
11:57:21.080 [main] INFO Flux 2 - onComplete()
11:57:21.081 [main] INFO FLUX 1 - cancel() <----- HERE
11:57:21.081 [main] INFO Flux 2 - cancel()
11:57:21.081 [main] INFO zipped - onComplete()
This is the "until any of the sources completes"
part.