Search code examples
javaspring-bootfunctional-programmingspring-webfluxproject-reactor

Flux.switchIfEmpty - what if it doesn't switch when first Flux is completed?


I have the following scenario, there are two endpoints (let's call them A and B) which return some values, these I need to process and report to a third endpoint (let's call it C). Values which can't be combined/processed I need to report to the same third endpoint (C) at the very end.

I did this by creating a Flux from each endpoint A and B which I combineLatest and then when I process the combined values I post these results to endpoint C. If the values can't be processed I store them in a Map.

After some thought I decided to create a third Flux from this Map that I should process when the two endpoints stop returning values. When they stop, I complete each of the Fluxes A and B, which in turn should complete flux1. combineLatest ensures that I will still be polling either A or B when one of them completes.

I have a method where I create a Flux from the Map above. I am hoping this method will actually be called only when the first Flux completes. I try to do this by doing switchIfEmpty.

Here is some pseudocode below:

var flux1 = Flux.combineLatest(fluxA, fluxB, Tuple<A,B>)
                .processRecords(Tuple<A,B>)
                .prepareDataToBeSent();

var flux2 = createFluxFromValuesInMap();

flux1.switchIfEmpty(flux2)
     .flatMap(sendResult)
     .subscribe();

The first part, of flux1 processing works. But flux2 never is processed. At first I thought it is because the Map in question is empty at the very beginning, since only when flux1 finishes processing all values it will have something in it. But then I see that this method where flux2 is creates isn't even called.

Can that be because the flux1 actually completes?

I am not sure how to do this with reactor otherwise?

UPDATE: After some testing, I guess if flux1 completes, flux2 will not start emitting. But, is there at all a way to chain fluxes if one of them completes?


Solution

  • An empty flux is a stream that completes without emitting a single value.

    Therefore, switchIfEmpty is triggered only when upstream flow has never emitted value.

    What you want is flux1.concatWith(flux2), that starts flux2 after flux1 is completed, with or without having emitted elements.