Search code examples
javaspringspring-webfluxspring-data-r2dbc

Save only unique elements from Flux<>


I'm pretty new with reactive programming and I have difficulty with understanding how to implement such simple logic as:

  • check if element is existed in database table
  • add element if it's missing

So, in my code I have a Flux<> with addresses and I need to add missing addresses to database

Flux<StreetAddressDto> parseDictionaries = Flux.defer(() ->
    csvParser.parseFile(fileName)
    .doOnNext(streetAddressDto -> {
      log.info("PROCESSING {}", streetAddressDto);
      dictionaryService.updateDictionaries(streetAddressDto);
    })

In dictionaryService.updateDictionaries() I have logic of finding address in Postgres table. If value is not presented yet, then I save this new value.

Mono<TownAddressEntity> townAddressEntityMono = townAddressRepository.findByTownName(townAddress.getTownName())
                .switchIfEmpty(townAddressRepository.save(townAddress));

There is a problem, when I have two same address elements in my Flux. The method townAddressRepository.findByTownName() is called for both elements, empty result is returned and then townAddressRepository.save(townAddress) is also called for both addresses. So after execution I will have two the same addresses in database. But I need only one unique address to be saved.
Could anybody help me to understand how to fix this issue?


Solution

  • You can do something like that. if address is found in fb you do Mono.emtpy this will not continue the stream if not found you emit the element so the stream continue and than you save in db

      public void run(String... args) throws Exception {
    Flux.just("address1","address2","address3")
        .flatMap(s -> getIfNotInDb(s))
        .flatMap(s -> saveInDb(s))
        
    }
    
    
      Mono<String> getIfNotInDb(String address){
    return   findInDb(address)
          .hasElement()
          .flatMap(aBoolean -> aBoolean?Mono.empty():Mono.just(address));
         
      }