Search code examples
javaspringspring-webfluxproject-reactorreactor

rolling back all changes when errors happen in ParallelFlux


I am using spring webflux with reactor and I have the following flux for uploading images, resizing them and storing them. For each size I execute the described flux in parallel on a custom executor service.

Any of the methods createDbAttachmentEntity, resizeAttachment, storeFile can throw various exceptions.

Executing the resizing in parallel means that any thread involved may throw exception. That means I need to rollback all the changes in case something was not properly executed.

For example I may have 5 sizes but in DB the system added only 4 and 5 are expected. Or I may have an error in converting my streams. Or I may have an error in the storage system.

In case of exception I would like to be able to rollback all the changes (manually delete database entries and manually delete the files.

How can I do that?

Flux.just(sizes)
        .parallel()
        .runOn(Schedulers.fromExecutor(executorService))
        .map(size -> createDbAttachmentEntity(size))
        .map(size_attachment -> resizeAttachment(size_attachment.getT1(), originalBytes))
        .map(size_attachment_bytes -> storeFile(...))
        .sequential()
        .collectList()
        .map(list -> {
            if(list.size() != sizes.length
                    || list.stream().anyMatch(objects -> objects.getT2().getId() == null)) {
                throw new RuntimeException();
            }
            return list;
        })
        .flux()
  here  .onErrorReturn(.......deleteEntities...........deleteFiles...........)     // problem: I do not have the files/entities
        .flatMap(list -> Flux.fromStream(list.stream()))
        .collectMap(Tuple2::getT1, Tuple2::getT2);

I was thinking about solving it with this but it doesn't work

Flux.just(1, 2, 3, 4, 5, 6, 7)
            .map(integer -> {
                if (integer == 3) throw new RuntimeException("3");
                return integer;
            })
            .flatMap(integer -> Flux.just(integer)
                    .onErrorResume(t -> {
                        System.out.println("--onErrorResume" + integer); // here is what I need to pass in
                        return Flux.empty();
                    }))

Solution

  • If I understand the requirements correctly could do something similar to this

    Custom Exception:

    public class FluxEntitiesException extends RuntimeException {
    
        private Set<Entity> entities;
        public FluxEntitiesException() {
            super();
        }
        public FluxEntitiesException(String s) {
            super(s);
        }
        public FluxEntitiesException(String message, Throwable cause) {
            super(message, cause);
        }
        public FluxEntitiesException(Throwable cause) {
            super(cause);
        }
    
        public FluxEntitiesException(Set<Entity> entities) {
            super();
            this.entities = entities;
        }
    
        public Set<Entity> getEntities() {
            return entities;
        }
    
        static final long serialVersionUID = -1848914673093119416L;
    }
    

    Example processor:

    public void processError(FluxEntitiesException e){
        for(Entity entity: e.entities){
            //DO SOMETHING TO ROLL BACK
            System.out.println("Rolling back: " + entity);
        }
    }
    

    Example entity:

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    public class Entity {
        private Integer id;
        //WHATEVER ELSE
    }
    

    Example Repo method:

    public Entity addEntity(Entity entity) throws RuntimeException{
        if(entity.getId() >4)
            throw new RuntimeException("OH NO!");
        System.out.println("Added entity" + entity);
        return entity;
    }
    

    Example method:

    public void AddToDb(){
        Set<Entity> entities = Collections.synchronizedSet(new HashSet<>());
    
        Flux.just(1,2,3,4,5,6,7)
                //Parallel stuff isn't needed was just an example since you use it in original
                .parallel()
                .runOn(Schedulers.boundedElastic())
                .map(s -> addEntity(new Entity(s)))
                .doOnNext(entities::add)
                .sequential()
                .doOnError(e -> processError(new FluxEntitiesException(entities)))
                .collectList()
                .subscribe(s -> System.out.println(entities));
    }