I am new to Spring batch and implementing a spring batch job where it has to pull huge data set from DB and write to file. Below is the sample job config which is working as expected for me.
@Bean
public Job customDBReaderFileWriterJob() throws Exception {
return jobBuilderFactory.get(MY_JOB)
.incrementer(new RunIdIncrementer())
.flow(partitionGenerationStep())
.next(cleanupStep())
.end()
.build();
}
@Bean
public Step partitionGenerationStep() throws Exception {
return stepBuilderFactory
.get("partitionGenerationStep")
.partitioner("Partitioner", partitioner())
.step(multiOperationStep())
.gridSize(50)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step multiOperationStep() throws Exception {
return stepBuilderFactory
.get("MultiOperationStep")
.<Input, Output>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public DBPartitioner partitioner() {
DBPartitioner dbPartitioner = new DBPartitioner();
dbPartitioner.setColumn(ID);
dbPartitioner.setDataSource(dataSource);
dbPartitioner.setTable(TABLE);
return dbPartitioner;
}
@Bean
@StepScope
public Reader reader() {
return new Reader();
}
@Bean
@StepScope
public Processor processor() {
return new Processor();
}
@Bean
@StepScope
public Writer writer() {
return new Writer();
}
@Bean
public Step cleanupStep() {
return stepBuilderFactory.get("cleanupStep")
.tasklet(cleanupTasklet())
.build();
}
@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
return new CleanupTasklet();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
As the data set is huge, i have configured thread pool value for task-executor as 10 and grid size 50. With this setup 10 threads are writing to 10 files at a time, and reader is reading file in chunks so reader processor and writer flow is iterating multiple times (for a group of 10, before moving to next partition).
Now, I would like to add a tasklet where i can compress files once all iteration (read, process,write) for one thread is completed i.e. after completion of each partition.
I do have a cleanup tasklet to run at last, but having compression logic there means to get all files generated from each partition first and then perform compression. Please suggest.
You can change your worker step multiOperationStep
to be a FlowStep
of a chunk-oriented step followed by a simple tasklet step where you do the compression. In other words, the worker step is actually two steps combined in one FlowStep
.