I want to make a spring batch job temporised. That is, I have one CSV file with more than 100 000 records. I want to read this file in chunks each day. For example first 100 records on the first day, then the next 100 records on the next day and so on. This is to minimise the load on our systems that consume this data.
@Configuration
@AllArgsConstructor
public class SpringBatchConfig {
private final GrpDataWriter writer;
@Bean
public FlatFileItemReader<GrpData> reader() {
FlatFileItemReader<GrpData> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("src/main/resources/grpData.csv"));
itemReader.setName("dataReader");
itemReader.setLinesToSkip(1);
itemReader.setLineMapper(lineMapper());
return itemReader;
}
private LineMapper<GrpData> lineMapper() {
DefaultLineMapper<GrpData> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(",");
lineTokenizer.setStrict(false);
lineTokenizer.setNames("id", "first_name", "last_name", "email");
BeanWrapperFieldSetMapper<GrpData> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(GrpData.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
public GrpDataProcessor processor() {
return new GrpDataProcessor();
}
@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step", jobRepository)
.<GrpData, GrpData>chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importGrpData", jobRepository)
.flow(step(jobRepository, transactionManager)).end().build();
}
}
@Component
@AllArgsConstructor
public class JobScheduler {
private final JobLauncher jobLauncher;
private final Job job;
private static int count = 0;
@Scheduled(fixedDelay = 6000, initialDelay = 5000)
public void scheduleJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startsAt", System.currentTimeMillis()).toJobParameters();
try{
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}
What change should I make here to achieve this? Or should I create a different scheduler? Thanks in advance.
I see you already have itemReader.setLinesToSkip(1)
. You could achieve this by using the linesToSkip
and maxItemCount
parameters. On day one, linesToSkip=1
and maxItemCount=100
, on day two you would set linesToSkip=101
and maxItemCount=100
, etc. This way, the job would only process a hundred items each day. The input file + the offset (ie the linesToSkip
value) should be passed as job parameters to yield different job instances that you could restart in case of failure:
@Bean
@StepScope
public FlatFileItemReader<GrpData> reader(
@Value("#{jobParameters['file']}") String file,
@Value("#{jobParameters['linesToSkip']}") Integer linesToSkip) {
FlatFileItemReader<GrpData> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource(file));
itemReader.setName("dataReader");
itemReader.setLinesToSkip(linesToSkip);
itemReader.setMaxItemCount(100);
itemReader.setLineMapper(lineMapper());
return itemReader;
}
A second option is to split the file if possible. With that in place, I would launch a job instance per day with the file partition as job parameter.
A third option is to ingest the file in a staging table, and run a job per day by passing the start/end offsets as job parameters.