Search code examples
multithreadingspring-batchblockingqueue

Can `ItemReader` in spring batch wait until the point in time when the data can be available for processing similar like Blocking Queues?


At present I am following the below strategy for processing items in a step.

TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
                        .<Object, Object>chunk(configuration.getChunkSize())
                        .reader(reader)
                        .processor(processor)
                        .writer(writer).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);

This Task Step does some additional work too like compressing the file and copying it to a different location therefore it took so long time to complete. How can I offload this additional work to background threads.

If background thread keep polling till new file arrives for compression then it may consume more CPU cycles whereas if we can put that thread on wait and notify it when new file arrives then it will become more complex.

How can I start a new TaskStep parallel to my existing above TaskStep in such way that ItemReader of that new TaskStep wait until the point in time when the file arrives for processing like blocking queues?


Solution

  • You can delegate "expensive" work to background thread if you define your processor as an AsyncItemProcessor. You can assign task executor to it with thread pool and delegate processor which will do actual work in background thread.

    Item reader will accept other files and will assign them to threads in task executor. When background thread completes processing of file it will be then assigned back to writer.

    AsyncProcessor asyncProcessor = new AsyncProcessor();
    asyncProcessor.setDelegate(processor);
    asyncProcessor.setTaskExecutor(taskExecutor);
    
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(writer);
    
    TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
                        .<Object, Object>chunk(configuration.getChunkSize())
                        .reader(reader)
                        .processor(asyncProcessor)
                        .writer(asyncWriter).transactionManager(txManager).build();
    TypedJobParameters typedJobParameters = new TypedJobParameters();
    runStep(processingStep, typedJobParameters);