Search code examples
kotlinproject-reactorreactor

Reactor - How to use filterWhen and still pass the value for logging purposes if it doesn't pass the filter?


I am trying to figure out the right way to do the following.

  • I want to check if a record exists in database by name, which is a global secondary index. (Let's say the primary key is id).
  • If there's already an item that has name, then log the id and name and returns an error.
  • If there's not an item with the given name, then proceed.

Right now, the code structure looks like this.

private fun checkExistingData(name: String): Mono<QueryResponse> {
  return repo.getDataByName(name)
    .filterWhen { Mono.just(!it.hasItems()) }
    .switchIfEmpty {
      // log the existing id and name from QueryResponse
      Mono.error(SomeCustomException))
    }
    .flatMap {
      // proceed
    }
}

As you can see, if I want to log the id in the switchIfEmpty clause, I would need to do repo.getDataByName(name) in it to be able to retrieve the item and get the id of the item. Obviously, this is inefficient as I already do that prior to switchIfEmpty.

What's the right way to do this?


Solution

  • Pending more information on the QueryResponse API, I'm going to assume a few things: - getDataByName returns a Mono<QueryResponse>. This Mono is ALWAYS valued, ie it always emits exactly one QueryResponse wheter or not data could be found - QueryResponse#items is what I'll use in my example to access the rows proper. I'm going to also assume it returns a Flux<Item>

    First off, filterWhen is not useful here since we also have a filter(boolean) method. I think that reversed filter logic is maybe a bit harder to follow.

    Why not do everything in the flatMap instead?

    private fun checkExistingData(name: String): Mono<QueryResponse> {
      return repo.getDataByName(name)
        .flatMap {
           if (it.hasItems())
             it.items()
               .single()
               .doOnNext(existing -> logExisting(existing.id(), existing.name())
               .then(Mono.error(SomeCustomException)
            else
              proceed()
        }   
    }