Search code examples
spring-bootaggregatereactive-programmingbulkr2dbc

How to implement a list of DB update queries in one call with SpringBoot Webflux + R2dbc application


The goal of my springBoot webflux r2dbc application is Controller accepts a Request including a list of DB UPDATE or INSERT details, and Response a result summary back.

I can write a ReactiveCrudRepository based repository to implement each DB operation. But I don't know how to write the Service to group the executions of the list of DB operations and compose a result summary response.

I am new to java reactive programing. Thanks for any suggestions and help.

Chen


Solution

  • I get the hint from here: https://www.vinsguru.com/spring-webflux-aggregation/ . Ideas are :

    1. From request to create 3 Monos
    • Mono<List> monoEndDateSet -- DB Row ids of update operation;
    • Mono<List> monoCreateList -- DB Row ids of new inserted;
    • Mono monoRespFilled -- partly fill some known fields;
    1. use Mono.zip aggregate the 3 monos, map and aggregate the Tuple3 to Mono to return.

    Below are key part of codes:

        public Mono<ChangeSupplyResponse> ChangeSupplies(ChangeSupplyRequest csr){
            
            ChangeSupplyResponse resp = ChangeSupplyResponse.builder().build();
            resp.setEventType(csr.getEventType());
            resp.setSupplyOperationId(csr.getSupplyOperationId());
            resp.setTeamMemberId(csr.getTeamMemberId());
            resp.setRequestTimeStamp(csr.getTimestamp());
            resp.setProcessStart(OffsetDateTime.now());
            resp.setUserId(csr.getUserId());
    
            Mono<List<Long>> monoEndDateSet = getEndDateIdList(csr);
            Mono<List<Long>> monoCreateList = getNewSupplyEntityList(csr);
            Mono<ChangeSupplyResponse> monoRespFilled = Mono.just(resp);
    
            return Mono.zip(monoRespFilled, monoEndDateSet, monoCreateList).map(this::combine).as(operator::transactional);
        }
    
    
        private ChangeSupplyResponse combine(Tuple3<ChangeSupplyResponse, List<Long>, List<Long>> tuple){
            ChangeSupplyResponse resp = tuple.getT1().toBuilder().build();
            List<Long> endDateIds = tuple.getT2();
            resp.setEndDatedDemandStreamSupplyIds(endDateIds);
    
            List<Long> newIds = tuple.getT3();
            resp.setNewCreatedDemandStreamSupplyIds(newIds);
            
            resp.setSuccess(true);
            Duration span = Duration.between(resp.getProcessStart(), OffsetDateTime.now());
            resp.setProcessDurationMillis(span.toMillis());
            return resp;
        }
    
       private Mono<List<Long>> getNewSupplyEntityList(ChangeSupplyRequest csr) {
            Flux<DemandStreamSupplyEntity> fluxNewCreated = Flux.empty();
            for (SrmOperation so : csr.getOperations()) {
                if (so.getType() == SrmOperationType.createSupply) {
                    DemandStreamSupplyEntity e = buildEntity(so, csr);
                    fluxNewCreated = fluxNewCreated.mergeWith(this.demandStreamSupplyRepository.save(e));
                }
            }
    
            return fluxNewCreated.map(e -> e.getDemandStreamSupplyId()).collectList();
        }
    

    ...