Search code examples
javaerror-handlingspring-webfluxproject-reactor

How to get the data in Flux onError


The question is not specific to database, but depicted it with the database example

  • I fetch some entities from database with a reactive driver
  • Convert them in Item entity using itemMapper.
  • save them in database
  • if error comes logs the error.

This works with below

 reactiveTemplate
          .query(cql, itemMapper) // This returns an item object
          .doOnNext(this::saveRecord) //this save item Object
          .doOnError(this::processErrorRecord);  // Logs the exception 

But If I want to log which item got the error, how to access failedIte in doOnError method?

 reactiveTemplate
      .query(cql, itemMapper) // This returns an item object
      .doOnNext(this::saveRecord) //this save item Object
      .doOnError(th -> processErrorRecord(<failed Item>,th)); //how to access the item which got error

Solution

  • It's hard to suggest something without knowing all details but here are your options. doOnNext is an operator typically used for "side-effect" logic such as logging, metrics, ... to execute non-reactive code. I'm not really sure why saveRecord is not reactive because it looks like you are using reactiveTemplate to query data.

    In case saveRecord is using reactiveTemplate you should return Mono<Void> (not void). In this case you can apply the following

    reactiveTemplate
            .query(cql, itemMapper) // This returns an item object
            .flatMap(record -> 
                    saveRecord(record) //this save item Object
                        .doOnError(th -> {
                            log.error("error while saving record: {}", record);
                            processErrorRecord(record, th);
                        })
            );
    
    

    Not sure what is the logic of processErrorRecord but in case this is reactive, use onErrorResume instead of doOnError.

    In case saveRecord is not reactive and blocking, you need to execute it on a separate Scheduler. Check How Do I Wrap a Synchronous, Blocking Call? for details

    reactiveTemplate
            .query(cql, itemMapper) // This returns an item object
            .flatMap(record ->
                    Mono.fromRunnable(() -> saveRecord(record))
                        .subscribeOn(Schedulers.boundedElastic())    
                        .doOnError(th -> {
                            log.error("error while saving record: {}",record)
                            processErrorRecord(record, th);
                        })
            );