Search code examples
kotlinreactive-programmingspring-webfluxproject-reactor

Efficiently matching two Flux


I have two Flux with 2 different data types as shown below:

Flux<Dog> dogs = loadDogsFromFile()
Flux<Man> men = loadMenFromFile()

data class Dog(
    val name: String,
    val ownerName: String,
)


data class Man(
    val name: String,
    val dogOwnerName: String,
)

As you can see the one field we can use to match these two Flux objects is dogOwnerName. Right now this is how I am comparing them

val disposable = dogs.flatMap { dog ->
    men.map { man->
        val isEqual = comparator(dog, man)
        Triple(dog, man, isEqual)
    }
}.filter {x -> x.third === true }

This gets the job done but it is nowhere efficient, it keeps looping even after the desired fields are found and because of that, we have to use a filter operator to only get what we need.

Edit

Based on @marstran comment on the user input, I have large JSON files that contain dogs and men that I'm loading here:

Flux<Dog> dogs = loadDogsFromFile()
Flux<Man> men = loadMenFromFile()

After matching the dogs to their owners/men I'm building an object that I'm saving to the database like this:

val disposable = dogs.flatMap { dog ->
    men.map { man->
        val isEqual = comparator(dog, man)
        Triple(dog, man, isEqual)
    }
}.filter {x -> x.third === true }
 .map{(dog,man,isEqual) ->
  DogOwner(man,dog) 
}.doOnNext{dogOwner -> dogOwnerRepository.save(dogOwner)}

Solution

  • Consider using method take(long n, boolean limitRequest) from Flux:

    public final Flux<T> take(long n,
                              boolean limitRequest)
    Take only the first N values from this Flux, if available.
    

    using it will allow you to break iterating over man once owner would be found. https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take-long-boolean-

    Take a look at this java example. I assume that dog and his owner have same value.

     @Test
        void dog_owner_test() {
            int COUNT = 10000;
            Flux<Integer> dogs = Flux.range(0, COUNT);
            Flux<Integer> man = Flux.range(0, COUNT);
    
            dogs.flatMap(
                dog ->
                    man.map(m -> Tuples.of(dog, m))
                        .filter(tuple -> tuple.getT1().equals(tuple.getT2()))
                        .map(tuple -> new DogOwner(tuple.getT1(), tuple.getT2()))
                        .take(1, true)
            )
                .doOnNext(System.out::println)
                // here you can save DogOwner to db
                .as(StepVerifier::create)
                .expectNextCount(COUNT)
                .verifyComplete();
        }
    
        private static class DogOwner {
            DogOwner(Integer dog, Integer owner) {
                this.dog = dog;
                this.owner = owner;
            }
    
            Integer dog;
            Integer owner;
    
            @Override
            public String toString() {
                return "DogOwner{" +
                    "dog=" + dog +
                    ", owner=" + owner +
                    '}';
            }
        }