Search code examples
javaproject-reactorspring-webclient

Is it possible to create a dynamic filter/predicate in Mono/Flux?


The app is a simple processing - reading data, filtering it, process and write filtered data.

Here is a simple code which runs without any issue:

  void run() {
    Flux.interval(Duration.ofMillis(200))
        .filter(value -> getPredicate().test(value))
        .flatMap(this::writeData)
        .subscribe();
  }

  private Predicate<Long> getPredicate() {
    return value -> value % 2 == 0;
  }

Is it possible to have dynamic predicate which will be retrieved from remote web service with periodic requests? If possible - how to use Mono<Predicate> inside .filter() and keep it non-blocking

For example replacing getPredicate() with below:

  private Mono<Predicate<Long>> getPredicateFromRemoteServer() {
    return webClient.get()
        .uri("/filters/1")
        .retrieve()
        .bodyToMono(Filter.class)
        .map(this::mapToPredicate)
        .cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
  }

  private Predicate<Long> mapToPredicate(Filter filter) {
    // here will be converting filter object into predicate
    return value -> value > 5;
  }

Ideally I would like to avoid cache(Duration.ofMinutes(10)) because filter could be updated each minute, or each day... and once filter is updated my service get notified, but I didn't find a way to invalidate cache externally, that's why Duration.ofMinutes(10) is used for some approximate invalidation.


Solution

  • Well, perhaps you could write the pipeline a bit differently. Instead of aspiring to return a new Predicate every time your process an item in your stream by calling getPredicateFromRemoteServer(), you could make the function itself your predicate. Pass the value you are processing from the stream and make it return a Mono<Boolean> with the answer and use that in a filterWhen pipe in your pipeline.

    For example, somewhat like this:

    private Mono<Boolean> isWithinThreshold(int value) {
        return webClient.get()
            .uri("/filters/1")
            .retrieve()
            .bodyToMono(Filter.class)
            .map(filter -> filter.threshold <= value)
            .cache(v -> Duration.ofMinutes(10), ex -> Duration.ZERO, () -> Duration.ZERO);
      }
    

    Then in your main pipeline you can do:

    Flux.interval(Duration.ofMillis(200))
            .filterWhen(value -> isWithinThreshold(value))
            .flatMap(this::writeData)
            .subscribe();
      }