Search code examples
javaspring-batch

JdbcPagingItemReader is missing to read few records when used with SimpleAsyncTaskExecutor


I am trying to read from table and dump the data in file

My job configuration looks like this

Task executor -->

   @Bean
    public TaskExecutor tvlTaskExecutor() {

        SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
        asyncTaskExecutor.setConcurrencyLimit(5);
        return asyncTaskExecutor;
    }

Step config -->

@Bean
public Step tvlFileCreationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
    return new StepBuilder("tvl-file-creation-step", jobRepository)
            .<String, String>chunk(50, transactionManager)
            .reader(tvlTableDataItemReader())
            .writer(tvlFileWriter)
            .listener(tvlStepExecutionListener)
            .taskExecutor(tvlTaskExecutor())
            .build();
}

job config -->

@Bean(name="tvlRunJob")
public Job tvlRunJob( JobRepository jobRepository,  Step tvlFileCreationStep) {
    return new JobBuilder("tvlFileCreationJob", jobRepository)
            .start(tvlFileCreationStep)
            .build();

}

and finally the reader -->

public JdbcPagingItemReader<String> jdbcPagingItemReader(DataSource dataSource) throws Exception {

    // reading ITAG keys records using JDBC in a paging fashion
    JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
    reader.setDataSource(dataSource);
    reader.setPageSize(50);
    reader.setSaveState(true);
    reader.setQueryProvider(createQuery(dataSource));
    reader.setRowMapper(new TVLRowMapper());
    return reader;



}

private PagingQueryProvider createQuery(DataSource dataSource) throws Exception {
    final Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("TAG_SERIAL_NUMBER", Order.ASCENDING);

    final SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);
    queryProvider.setSelectClause(" TAG_SERIAL_NUMBER ");
    queryProvider.setFromClause(" ITAGENTITY_10052024 ");
    queryProvider.setWhereClause(" TAG_SERIAL_NUMBER like '09900%' ");
    queryProvider.setSortKeys(sortKeys);
    return queryProvider.getObject();
}

My writer config looks like -->

using XMLOutputFactory2 - from the stax2
and using XMLStreamWriter2 as writer - from stax2 implementaion 
which is thread safe as per documentation 
package path - org.codehaus.stax2.*;

When i use single threaded approach - it reads all records , but when i use taskExecutor - it is missing out on few records , it is missing 4-5 records out of 700 records

( Note - actual volume is huge - So i need to implement multi threaded approach .)

Please let me know if i have done anything incorrect or wrong. Any help is appreciated , thank you


Solution

  • It was a very silly miss, JdbcPagingItemReader - when working with Async task executor - we can not use state.

    I was using SimpleAsyncTask - it will just randomize the chunks and send them to writer , so my writer was fine. I was getting some records extra some time and some records less some time

    So for Async just make sure you set the saveState to false

     JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
        reader.setDataSource(dataSource);
        reader.setPageSize(50);
        // change the below line  
        reader.setSaveState(false);
        reader.setQueryProvider(createQuery(dataSource));
        reader.setRowMapper(new TVLRowMapper());
        return reader;