At present I am following the below strategy for processing items in a step
.
TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
.<Object, Object>chunk(configuration.getChunkSize())
.reader(reader)
.processor(processor)
.writer(writer).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);
This Task Step
does some additional work too like compressing the file and copying it to a different location therefore it took so long time to complete. How can I offload this additional work to background threads.
If background thread keep polling till new file arrives for compression then it may consume more CPU cycles
whereas if we can put that thread on wait and notify it when new file arrives then it will become more complex.
How can I start a new TaskStep
parallel to my existing above TaskStep
in such way that ItemReader
of that new TaskStep
wait until the point in time when the file arrives for processing like blocking queues
?
You can delegate "expensive" work to background thread if you define your processor as an AsyncItemProcessor. You can assign task executor to it with thread pool and delegate processor which will do actual work in background thread.
Item reader will accept other files and will assign them to threads in task executor. When background thread completes processing of file it will be then assigned back to writer.
AsyncProcessor asyncProcessor = new AsyncProcessor();
asyncProcessor.setDelegate(processor);
asyncProcessor.setTaskExecutor(taskExecutor);
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(writer);
TaskletStep processingStep = stepBuilderFactory.get(getLabel() + "-" + UUID.randomUUID().toString())
.<Object, Object>chunk(configuration.getChunkSize())
.reader(reader)
.processor(asyncProcessor)
.writer(asyncWriter).transactionManager(txManager).build();
TypedJobParameters typedJobParameters = new TypedJobParameters();
runStep(processingStep, typedJobParameters);