Search code examples
javaspringspring-batch

SpringBatch Sharing Large Amounts of Data Between Steps


I have a need to share relatively large amounts of data between job steps for a spring batch implementation. I am aware of StepExecutionContext and JobExecutionContext as mechanisms for this. However, I read that since these must be limited in size (less than 2500 characters). That is too small for my needs. In my novice one-step Spring Batch implementation, my single step job is as below:

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;


    private static final String GET_DATA =
    "    SELECT " +
    "stuffA, " +
    "stuffB, " +
    "FROM STUFF_TABLE " +
    "ORDER BY stuffA ASC";

    @Bean
    public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<StuffDto>()
                .name("cursorItemReader")
            .dataSource(dataSource)
            .sql(GET_DATA)
            .rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
                .build();
    }

    @Bean
    ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
        return new QueryLoggingProcessor();
    }

    @Bean
    public ItemWriter<StuffDto> databaseCursorItemWriter() {
        return new LoggingItemWriter();
    }

    @Bean
    public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
    @Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
    StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("databaseCursorStep")
            .<StuffDto, StuffDto>chunk(1)
        .reader(reader)
            .writer(writer)
            .build();
    }

    @Bean
    public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
    JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("databaseCursorJob")
            .incrementer(new RunIdIncrementer())
            .flow(exampleJobStep)
            .end()
            .build();
    }
}

This works fine in the sense that I can successfully read from the database and write in the writer step to a loggingitemwriter like this:

public class LoggingItemWriter implements ItemWriter<StuffDto> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);

    @Override
    public void write(List<? extends StuffDto> list) throws Exception {
        LOGGER.info("Writing stuff: {}", list);
    }
}

However, I need to be able to make available that StuffDto (or equivalent) and it's data to a second step that would be performing some processing against it rather than just logging it.

I would be grateful for any ideas on how that could be accomplished if you assume that the step and job contexts are too limited. Thanks.


Solution

  • If you do not want to write the data in the database or filesystem, one way to achieve the same is like below:

    1. Create your own job context bean in your config class having the required properties and annotated it with @JobScope.
    2. Implement org.springframework.batch.core.step.tasklet interface to your reader, processor and writer classes. If you want more control over steps you can also implement org.springframework.batch.core.StepExecutionListener with it.
    3. Get your own context object using @Autowire and use the setter-getter method of it to store and retrieve the data.

    Sample Code:

    Config.java

    @Autowired
    private Processor processor;
    
    @Autowired
    private Reader reader;
    
    @Autowired
    private Writer writer;
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
         
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    @JobScope
    public JobContext getJobContexts() {
        JobContext newJobContext = new JobContext();
        return newJobContext;
    }
    
    @Bean
    public Step reader() {
          return stepBuilderFactory.get("reader")
            .tasklet(reader)
            .build();
         }
    
    @Bean
    public Step processor() {
          return stepBuilderFactory.get("processor")
            .tasklet(processor)
            .build();
         }
    
    @Bean
    public Step writer() {
          return stepBuilderFactory.get("writer")
            .tasklet(writer)
            .build();
         }
    
    
    public Job testJob() {
          
          return jobBuilderFactory.get("testJob")
            .start(reader())
            .next(processor())
            .next(writer())
            .build();
         }
    
    //Below will start the job
    @Scheduled(fixedRate = 1000)
        public void starJob(){
            
            Map<String, JobParameter> confMap = new HashMap<>();
            confMap.put("time", new JobParameter(new Date()));
            JobParameters jobParameters = new JobParameters(confMap);
            monitorJobLauncher.run(testJob(), jobParameters);
            
    }
    

    JobContext.java

    private List<StuffDto> dataToProcess = new ArrayList<>();
    private List<StuffDto> dataToWrite = new ArrayList<>();
    
    //getter
    

    SampleReader.java

    @Component
    public class SampleReader  implements Tasklet,StepExecutionListener{
            @Autowired
            private JobContext context;
           
          @Override
          public void beforeStep(StepExecution stepExecution) {
        //logic that you need to perform before the execution of this step.
          }
        
        @Override
          public void afterStep(StepExecution stepExecution) {
        //logic that you need to perform after the execution of this step.
          }
        
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
         // Whatever code is here will get executed for reader.
         //  Fetch StuffDto object from database and add it to jobContext 
       //dataToProcess list.
        return RepeatStatus.FINISHED;
        }
    }
    

    SampleProcessor.java

       @Component
       public class SampleProcessor  implements Tasklet{
        
           @Autowired
           private JobContext context;
        
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
            
             // Whatever code is here will get executed for processor.
            // context.getDataToProcessList();
           // apply business logic and set the data to write.
    
    return RepeatStatus.FINISHED;
        }
    

    Same ways for the writer class.

    Note: Please note here that here you need to write database-related boilerplate code on your own. But this way you can have more control over your logic and nothing to worry about context size limit. All the data will be in memory so as soon as operation done those will be garbage collected. I hope you get the idea of what I willing to convey.

    To read more about Tasklet vs Chunk read this.