Search code examples
spring-batchnonblocking

How to make a non-blocking item processor in spring batch (Not only asynchronous with a TaskExecuter)?


Spring batch has a facility called AsyncItemProcessor. It simply wraps an ItemProcessor and runs it with a TaskExecutor, so it can run asynchronously. I want to have a rest call in this ItemProcessor, the problem is that every thread inside this TaskExecutor which makes the rest call, will be blocked until the response is gotten. I want to make it non-blocking, something like a reactive paradigm.

I have an ItemProcessor that calls a Rest point and get its response:

    @Bean
    public ItemProcessor<String, String> testItemProcessor() {
        return item -> {
            String url = "http://localhost:8787/test";
            try {
                // it's a long time process and take a lot of time
                String response = restTemplate.exchange(new URI(url), HttpMethod.GET, new RequestEntity(HttpMethod.GET, new URI(url)), String.class).getBody();
                return response;
            } catch (URISyntaxException e) {
                e.printStackTrace();
                return null;
            }
        };
    }

Now I wrap it with AsyncItemProcessor:

    @Bean
    public AsyncItemProcessor testAsyncItemProcessor() throws Exception {
        AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(testItemProcessor());
        asyncItemProcessor.setTaskExecutor(testThreadPoolTaskExecutor());
        asyncItemProcessor.afterPropertiesSet();
        return asyncItemProcessor;
    }

    @Bean
    public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        return threadPoolTaskExecutor;
    }

I used a ThreadPoolTaskExecutor as the TaskExecuter.

This is the ItemWriter:

    @Bean
    public ItemWriter<String> testItemWriter() {
        return items -> {
            // I write them to a file and a database, but for simplicity:
            for (String item : items) {
                System.out.println(item);
            }
        };
    }

    @Bean
    public AsyncItemWriter asyncTestItemWriter() throws Exception {
        AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(testItemWriter());
        asyncItemWriter.afterPropertiesSet();
        return asyncItemWriter;
    }

The step and job configuration:

    @Bean
    public Step testStep() throws Exception {
        return stepBuilderFactory.get("testStep")
                .<String, String>chunk(1000)
                .reader(testItemReader())
                .processor(testAsyncItemProcessor())
                .writer(asyncTestItemWriter())
                .build();
    }


    @Bean
    public Job testJob() throws Exception {
        return jobBuilderFactory.get("testJob")
                .start(testStep())
                .build();
    }

The ItemReader is a simple ListItemReader:

    @Bean
    public ItemReader<String> testItemReader() {
        List<String> integerList = new ArrayList<>();
        for (int i=0; i<10000; i++) {
            integerList.add(String.valueOf(i));
        }
        return new ListItemReader(integerList);
    }

Now I have a ThreadPoolTaskExecutor with 50~100 threads. Each thread inside ItemProcessor makes a rest call and waits/blocks to receive the response from the server. Is there a way to make these calls/process non-blocking? If the answer is yes, how should I design the ItemWriter? Inside the ItemWriter I want to write the results from the ItemProcessor to a file and a database. Each chunk has a size of 1000, I can wait until all of the records inside it get processed, but I don't want to block a thread per each rest call inside the chunk. Is there any way to accomplish that?

I know that the Spring rest template is the one which makes the process blocking and webclient should be used, but is there any equivalent component in spring batch (instead of AsyncItemProcessor/AsyncItemWriter) for web client?


Solution

  • No, there is no support for reactive programming in Spring Batch yet, there is an open feature request here: https://github.com/spring-projects/spring-batch/issues/1008.

    Please note that going reactive means the entire the stack should be reactive, from batch artefacts (reader, processor, writer, listeners, etc) to infrastructure beans (job repository, transaction manager, etc), and not only your item processor and writer.

    Moreover, the current chunk processing model is actually incompatible with reactive paradigm. The reason is that a ChunkOrientedTasklet uses basically two collaborators:

    • A ChunkProvider which provides chunks of items (delegating item reading to an ItemReader)
    • A ChunkProcessor which processes chunks (delegating processing and writing respectively to an ItemProcessor/ItemWriter)

    Here is a simplified version of the code:

    Chunk inputs = chunkProvider.provide();
    chunkProcessor.process(inputs);
    

    As you can see, the step will wait for the chunkProcessor (processor + writer) to process the whole chunk before reading the next one. So in your case, even if you use non-blocking APIs in your processor + writer, your step will be waiting for the chunk to be completely processed before reading the next chunk (besides waiting for blocking interactions with the job repository and transaction manager).