I am trying to read from table and dump the data in file
My job configuration looks like this
Task executor -->
@Bean
public TaskExecutor tvlTaskExecutor() {
SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
Step config -->
@Bean
public Step tvlFileCreationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
return new StepBuilder("tvl-file-creation-step", jobRepository)
.<String, String>chunk(50, transactionManager)
.reader(tvlTableDataItemReader())
.writer(tvlFileWriter)
.listener(tvlStepExecutionListener)
.taskExecutor(tvlTaskExecutor())
.build();
}
job config -->
@Bean(name="tvlRunJob")
public Job tvlRunJob( JobRepository jobRepository, Step tvlFileCreationStep) {
return new JobBuilder("tvlFileCreationJob", jobRepository)
.start(tvlFileCreationStep)
.build();
}
and finally the reader -->
public JdbcPagingItemReader<String> jdbcPagingItemReader(DataSource dataSource) throws Exception {
// reading ITAG keys records using JDBC in a paging fashion
JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
reader.setDataSource(dataSource);
reader.setPageSize(50);
reader.setSaveState(true);
reader.setQueryProvider(createQuery(dataSource));
reader.setRowMapper(new TVLRowMapper());
return reader;
}
private PagingQueryProvider createQuery(DataSource dataSource) throws Exception {
final Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("TAG_SERIAL_NUMBER", Order.ASCENDING);
final SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause(" TAG_SERIAL_NUMBER ");
queryProvider.setFromClause(" ITAGENTITY_10052024 ");
queryProvider.setWhereClause(" TAG_SERIAL_NUMBER like '09900%' ");
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
My writer config looks like -->
using XMLOutputFactory2 - from the stax2
and using XMLStreamWriter2 as writer - from stax2 implementaion
which is thread safe as per documentation
package path - org.codehaus.stax2.*;
When i use single threaded approach - it reads all records , but when i use taskExecutor - it is missing out on few records , it is missing 4-5 records out of 700 records
( Note - actual volume is huge - So i need to implement multi threaded approach .)
Please let me know if i have done anything incorrect or wrong. Any help is appreciated , thank you
It was a very silly miss, JdbcPagingItemReader - when working with Async task executor - we can not use state.
I was using SimpleAsyncTask - it will just randomize the chunks and send them to writer , so my writer was fine. I was getting some records extra some time and some records less some time
So for Async just make sure you set the saveState to false
JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
reader.setDataSource(dataSource);
reader.setPageSize(50);
// change the below line
reader.setSaveState(false);
reader.setQueryProvider(createQuery(dataSource));
reader.setRowMapper(new TVLRowMapper());
return reader;