Search code examples
springspring-bootspring-batch

Temporising a Spring Batch Job


I want to make a spring batch job temporised. That is, I have one CSV file with more than 100 000 records. I want to read this file in chunks each day. For example first 100 records on the first day, then the next 100 records on the next day and so on. This is to minimise the load on our systems that consume this data.

@Configuration
@AllArgsConstructor
public class SpringBatchConfig {

    private final GrpDataWriter writer;

    @Bean
    public FlatFileItemReader<GrpData> reader() {
        FlatFileItemReader<GrpData> itemReader = new FlatFileItemReader<>();
        itemReader.setResource(new FileSystemResource("src/main/resources/grpData.csv"));
        itemReader.setName("dataReader");
        itemReader.setLinesToSkip(1);
        itemReader.setLineMapper(lineMapper());
        return itemReader;
    }

    private LineMapper<GrpData> lineMapper() {
        DefaultLineMapper<GrpData> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(",");
        lineTokenizer.setStrict(false);
        lineTokenizer.setNames("id", "first_name", "last_name", "email");
        BeanWrapperFieldSetMapper<GrpData> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(GrpData.class);
        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);
        return lineMapper;
    }

    @Bean
    public GrpDataProcessor processor() {
        return new GrpDataProcessor();
    }

    @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("step", jobRepository)
                .<GrpData, GrpData>chunk(10, transactionManager)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

    @Bean
    public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("importGrpData", jobRepository)
                .flow(step(jobRepository, transactionManager)).end().build();
    }
}
@Component
@AllArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;

    private final Job job;

    private static int count = 0;

    @Scheduled(fixedDelay = 6000, initialDelay = 5000)
    public void scheduleJob() {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("startsAt", System.currentTimeMillis()).toJobParameters();
        try{
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

What change should I make here to achieve this? Or should I create a different scheduler? Thanks in advance.


Solution

  • I see you already have itemReader.setLinesToSkip(1). You could achieve this by using the linesToSkip and maxItemCount parameters. On day one, linesToSkip=1 and maxItemCount=100, on day two you would set linesToSkip=101 and maxItemCount=100, etc. This way, the job would only process a hundred items each day. The input file + the offset (ie the linesToSkip value) should be passed as job parameters to yield different job instances that you could restart in case of failure:

    @Bean
    @StepScope
    public FlatFileItemReader<GrpData> reader(
       @Value("#{jobParameters['file']}") String file,
       @Value("#{jobParameters['linesToSkip']}") Integer linesToSkip) {
    
       FlatFileItemReader<GrpData> itemReader = new FlatFileItemReader<>();
       itemReader.setResource(new FileSystemResource(file));
       itemReader.setName("dataReader");
       itemReader.setLinesToSkip(linesToSkip);
       itemReader.setMaxItemCount(100);
       itemReader.setLineMapper(lineMapper());
       return itemReader;
    
    }
    

    A second option is to split the file if possible. With that in place, I would launch a job instance per day with the file partition as job parameter.

    A third option is to ingest the file in a staging table, and run a job per day by passing the start/end offsets as job parameters.