I have a requirement to rewrite an existing application with Spring Batch that reads from multiple flat files, does processing, and then writes data back out to flat files.
The first input file contains key data, and the remaining input files may, or may not, contain transaction data for the key from the first file. I was thinking that it would make sense to use a FlatFileItemReader
to get the key data, and use a series of SingleItemPeekableItemReader
in the processor to build the transaction data and add it to the Data object (using the peekable reader since the key may not be in the extra files - yes, the files come pre-sorted).
Pseudocode
while(<more data>) {
while (data.getKey() == peekablePeek.getKey()) {
data.addTransaction(peekable.read());
// repeat for each transaction file
}
// do regular processing for transactions (which will update <data> object fields based)
Is this even possible? I tried injecting a SingleItemPeekableItemReader
into the ItemProcessor
but that just gets a Reader not Open
exception when the processor tries to do the peek.
Any ideas would be greatly appreciated.
Turns out, I needed to make sure that the files were open before hitting the processor. What I did was set up two Tasklets - one to open the file, and one to close the file, and then added them to the job configuration as appropriate (Open
before the regular step, and Close
after the regular step).
Tasklets:
public class OpenSubFileTask implements Tasklet {
@Autowired
SingleItemPeekableItemReader<Data> readerPeek;
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
{
readerPeek.open(contribution.getStepExecution().getExecutionContext());
return RepeatStatus.FINISHED;
}
public class CloseSubFileTask implements Tasklet {
@Autowired
SingleItemPeekableItemReader<Data> readerPeek;
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception
{
readerPeek.close();
return RepeatStatus.FINISHED;
}
}
Job Configuration:
@Bean
public Job importUserJob(JobNotificationListener listener,
Step stepOpen, Step stepClose, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(stepOpen)
.next(step1)
.next(stepClose)
.build();
}