Search code examples
javafuturecompletable-futureforkjoinpool

How to make operation with potential DB interactions in parallel on list


I try to make my code work more efficiently so I try to understand how to make it work with Futures and ForkJoinPool.

For now I have code that works like this:

@RestController
@RequestMapping(SEND)
@Slf4j
public class InputMessageController {
private HandlerService service;
    @ApiResponses(value = {
            @ApiResponse(code = 200, message = "message sent"),
            @ApiResponse(code = 404, message = "channel not found or inactive"),
    })
    @RequestMapping(value = "/{channel}", method = RequestMethod.POST)
    @ResponseStatus(HttpStatus.OK)
    public List<ResponseDto> sendNotification(
            @Valid @RequestBody @NotNull List<PayloadInfoDto> requestDtoList,
            @PathVariable("channel") String url) throws InputChannelInactiveException {

        InputChannel channel = mapService.getChannelByUrl(url);

        if (channel != null){
            return service.processing(requestDtoList, channel);
        } else {
            loggerService.logChannelNotFound()
            throw new InputChannelInactiveException(url);
        }
    }
}
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {

        List<ResponseDto> responseDtoList = new ArrayList<>();

//this one takes quite long. would be better if it was done in multiple threads
        requestDtoList.forEach(inputRequestDto -> responseDtoList.add(processNorm(inputRequestDto, channel)));

        return responseDtoList;
    }
}

processNorm(PayloadInfoDto inputRequestDto, InputChannel channel) {
        RequestMessage msg;

        //call to multiple services which can throw an exception. Each exception is processed.
        //call to logger service which logs info to database

}

My questions are:

  1. For logger service, which operates with database will it be better to write instead of regular saving to db, especially if we don't want to wait for DB write and want to move on?
private ExecutorService executor = Executors.newWorkStealingPool();
///....some code
public void exampleAsyncLog(){
//I don't wan't to wait while logger service writes to DB. It can take as long as it wants to write, 
//while I'll move on
executor.submit(() -> saveSomethingToDb()); //saveSomethingToDb() is trivial logRepository.save(newEntity)
}
  1. I haven't figured out how to run CompletableFuture/ForkJoinPool on List. At least in a way that'll a) take list as argument b) process each unit in list in parallel c) end result is also list. All examples that I've encountered operate with making list of CompletableFuture and then doing allOf() like in this blogpost. Is it the correct way to produce list of results from list using futures? Or is there a better approach?

Solution

  • I will say you to use CompletableFuture, for this task because you want to proceed only when all your task is saved in DB which is safest for your task. So code something like:

    public class HandlerServiceImpl implements HandlerService {
    public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {
    
            List<ResponseDto> responseDtoList = new ArrayList<>();
    
    //this one takes quite long. would be better if it was done in multiple threads
    CompletableFuture<List<ResponseDto>> future
      = CompletableFuture.supplyAsync(() -> requestDtoList.stream().map(inputRequestDto -> processNorm(inputRequestDto, channel)).collect(Collectors.toList())); //This code is not tested. It's the logic that I am showing you to implement that will work for you.
    
            return future.get();//wait till all the operation is completed.
        }
    }