Search code examples
javaspringmultithreadingspring-batchflatfilereader

Can I use FlatfileItemReader with Taskexecutor?


Can I use FlatfileItemReader with Taskexecutor in spring batch??

I have implemented FlatFileItemReader with ThreadPoolTaskExecutor. When I print the records in ItemProcessor, I do not get consistent results, i.e. not all the records are printed and sometimes one of the record is printed more than once. It leads me to the fact that FlatFileItemReader is not thread safe and also it says the same in spring docs but I see some blogs where it says it is possible to use FlatFileItemReader with Task Executor.

So my question is: Is it possible to use FlatfileItemReader with Task Executor in anyway ?

    @Bean
    @StepScope
    public FlatFileItemReader<DataLifeCycleEvent> csvFileReader(
            @Value("#{stepExecution}") StepExecution stepExecution) {

        Resource inputResource;
        FlatFileItemReader<DataLifeCycleEvent> itemReader = new FlatFileItemReader<>();

        itemReader.setLineMapper(new OnboardingLineMapper(stepExecution));
        itemReader.setLinesToSkip(1);
        itemReader.setSaveState(false);
        itemReader.setSkippedLinesCallback(new OnboardingHeaderMapper(stepExecution));
        String inputResourceString = stepExecution.getJobParameters().getString("inputResource");
        inputResource = new FileSystemResource(inputFileLocation + ApplicationConstant.SLASH + inputResourceString);
        itemReader.setResource(inputResource);
        stepExecution.getJobExecution().getExecutionContext().putInt(ApplicationConstant.ERROR_COUNT, 0);
        return itemReader;
    }

Solution

  • FlatFileItemReader extends AbstractItemCountingItemStreamItemReader which is NOT thread-safe. So if you use it in a multi-threaded step, you need to synchronize it.

    You can wrap it in a SynchronizedItemStreamReader. Here is a quick example:

    @Bean
    public SynchronizedItemStreamReader<DataLifeCycleEvent> itemReader() {
        FlatFileItemReader<DataLifeCycleEvent> itemReader = ... // your item reader
    
        SynchronizedItemStreamReader<DataLifeCycleEvent> synchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
        synchronizedItemStreamReader.setDelegate(itemReader);
        return synchronizedItemStreamReader;
    }