I have a need to share relatively large amounts of data between job steps for a spring batch implementation. I am aware of StepExecutionContext
and JobExecutionContext
as mechanisms for this. However, I read that since these must be limited in size (less than 2500 characters). That is too small for my needs. In my novice one-step Spring Batch
implementation, my single step job is as below:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
private static final String GET_DATA =
" SELECT " +
"stuffA, " +
"stuffB, " +
"FROM STUFF_TABLE " +
"ORDER BY stuffA ASC";
@Bean
public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<StuffDto>()
.name("cursorItemReader")
.dataSource(dataSource)
.sql(GET_DATA)
.rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
.build();
}
@Bean
ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
return new QueryLoggingProcessor();
}
@Bean
public ItemWriter<StuffDto> databaseCursorItemWriter() {
return new LoggingItemWriter();
}
@Bean
public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
@Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("databaseCursorStep")
.<StuffDto, StuffDto>chunk(1)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("databaseCursorJob")
.incrementer(new RunIdIncrementer())
.flow(exampleJobStep)
.end()
.build();
}
}
This works fine in the sense that I can successfully read from the database and write in the writer step to a loggingitemwriter like this:
public class LoggingItemWriter implements ItemWriter<StuffDto> {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);
@Override
public void write(List<? extends StuffDto> list) throws Exception {
LOGGER.info("Writing stuff: {}", list);
}
}
However, I need to be able to make available that StuffDto
(or equivalent) and it's data to a second step that would be performing some processing against it rather than just logging it.
I would be grateful for any ideas on how that could be accomplished if you assume that the step and job contexts are too limited. Thanks.
If you do not want to write the data in the database or filesystem, one way to achieve the same is like below:
config
class having the required properties and annotated it with @JobScope
.org.springframework.batch.core.step.tasklet
interface to your reader, processor and writer classes. If you want more control over steps you can also implement org.springframework.batch.core.StepExecutionListener
with it.context
object using @Autowire
and use the setter-getter method of it to store and retrieve the data.Sample Code:
Config.java
@Autowired
private Processor processor;
@Autowired
private Reader reader;
@Autowired
private Writer writer;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
@JobScope
public JobContext getJobContexts() {
JobContext newJobContext = new JobContext();
return newJobContext;
}
@Bean
public Step reader() {
return stepBuilderFactory.get("reader")
.tasklet(reader)
.build();
}
@Bean
public Step processor() {
return stepBuilderFactory.get("processor")
.tasklet(processor)
.build();
}
@Bean
public Step writer() {
return stepBuilderFactory.get("writer")
.tasklet(writer)
.build();
}
public Job testJob() {
return jobBuilderFactory.get("testJob")
.start(reader())
.next(processor())
.next(writer())
.build();
}
//Below will start the job
@Scheduled(fixedRate = 1000)
public void starJob(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(new Date()));
JobParameters jobParameters = new JobParameters(confMap);
monitorJobLauncher.run(testJob(), jobParameters);
}
JobContext.java
private List<StuffDto> dataToProcess = new ArrayList<>();
private List<StuffDto> dataToWrite = new ArrayList<>();
//getter
SampleReader.java
@Component
public class SampleReader implements Tasklet,StepExecutionListener{
@Autowired
private JobContext context;
@Override
public void beforeStep(StepExecution stepExecution) {
//logic that you need to perform before the execution of this step.
}
@Override
public void afterStep(StepExecution stepExecution) {
//logic that you need to perform after the execution of this step.
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for reader.
// Fetch StuffDto object from database and add it to jobContext
//dataToProcess list.
return RepeatStatus.FINISHED;
}
}
SampleProcessor.java
@Component
public class SampleProcessor implements Tasklet{
@Autowired
private JobContext context;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for processor.
// context.getDataToProcessList();
// apply business logic and set the data to write.
return RepeatStatus.FINISHED;
}
Same ways for the writer
class.
Note: Please note here that here you need to write database-related boilerplate code on your own. But this way you can have more control over your logic and nothing to worry about context size limit. All the data will be in memory so as soon as operation done those will be garbage collected. I hope you get the idea of what I willing to convey.
To read more about Tasklet
vs Chunk
read this.