Search code examples
reactive-programmingproject-reactor

Combine two Flux instances of different types


With SpringBoot 2 and with the Poi class (Point of Interest):

public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}

I have 2 Flux of Poi:

Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;

The first element availablePoisFlux contains Pois with:

  • a poidId
  • NO latitude information
  • NO longitude information
  • a price information

The second element poiFlux contains Pois with:

  • a poidId
  • a latitude
  • a longitude
  • NO price information

(poidId is an identifier of a Poi).

I want to create a new Flux resultPoisFlux with Pois (with poidId, price, longitude and latitude) from two Flux (poiFlux and availablePoisFlux).

The poidId attribut is the key between the two Flux (poiFlux and availablePoisFlux).

Sample Implementation:

I think I can use the zipWith operator to do that, but I need some informations and advices with reactive operators (and filter ?)

I want to iterate from the first Flux and get informations (price) from the second flux using the poidId identifier and update price attribute with the correct value.

Sample Input Values:

poiFlux = Poi(poidId=poiId0, price=null, name=name0, latitude=2.2222, longitude=14.222)
poiFlux = Poi(poidId=poiId1, price=null, name=name1, latitude=3.2222, longitude=15.222)
poiFlux = Poi(poidId=poiId2, price=null, name=name2, latitude=4.2222, longitude=16.222)
poiFlux = Poi(poidId=poiId3, price=null, name=name3, latitude=5.2222, longitude=17.222)
poiFlux = Poi(poidId=poiId4, price=null, name=name4, latitude=6.2222, longitude=18.222)
poiFlux = Poi(poidId=poiId5, price=null, name=name5, latitude=7.2222, longitude=19.222)
poiFlux = Poi(poidId=poiId6, price=null, name=name6, latitude=8.2222, longitude=20.222)
poiFlux = Poi(poidId=poiId7, price=null, name=name7, latitude=9.2222, longitude=21.222)
poiFlux = Poi(poidId=poiId8, price=null, name=name8, latitude=10.2222, longitude=22.222)
poiFlux = Poi(poidId=poiId9, price=null, name=name9,  latitude=11.2222, longitude=23.222)

availablePoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=null, longitude=null)

Expected Result:

resultPoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=2.2222, longitude=14.222)
resultPoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=3.2222, longitude=15.222)
resultPoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=4.2222, longitude=16.222)
resultPoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=5.2222, longitude=17.222)
resultPoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=6.2222, longitude=18.222)

Something like that:

Flux<Poi> resultPoisFlux = availablePoisFlux.zipWith(poiFlux, (a, b) -> new Poi(a.getPoidId(), a.getPrice(), getLatitudeFromPoiFluxByPoidId(a.getPoidId()), getLongitudeFromPoiFluxByPoidId(a.getPoidId())))....

Thanks for you help.


Solution

  • zip/zipWith, but it only combines two sources pair-wise...

    ...as long as it has enough elements to make pairs. So it is only useful in your case if you are guaranteed that the elements come in the same order in both source, and there are no discrepancies in poiIds on each side. In your example that is the case because, even though second source only has 4 elements, these elements are the same as the beginning of the first source.

    poiFlux.zipWith(availablePoisFlux, (a, b) -> new Poi(a.getPoiId(), 
        b.getPrice(),
        a.getLatitude(),
        a.getLongitude(),
        a.getName()));
    

    More generic solution, less reactive one

    If there is no such guarantee, you need to somehow combine two disordered and disjoint sequences. You cannot do that without collecting all the element in one of the sources (preferably availablePoisFlux), which means it will delay the processing of the other source until said source has completed.

    One way of combining would be to collect all values into a map keyed by poiId and then "iterate" over the second source. Since some elements might not be found in the map, you need handle to be able to "skip" these:

    availablePoisFlux.collectMap(Poi::getId, Poi::getPrice)
        .flatMapMany(knownPrices -> poiFlux.handle((poi, sink) -> {
            String poiId = poi.getPoiId();
            if (knownPrices.containsKey(poiId) {
                Double price = knownPrices.get(poiId);
                Poi complete = new Poi(poiId, price, poi.getLatitude(),
                    poi.getLongitude(), poi.getName());
                sink.next(complete);
            } //else do nothing and let handle skip that poi
        }));