I'm struggling to find a Process Indicator Pattern sample. So far I have a source STUDENTS
table where I have a STATUS
field to indicate whether the record was already processed or not. I'm using a task executor for multithreading processing.
In my writer, I'm inserting the processed records to a new PROCESSED_STUDENTS
table and update the status of the processed records in the source STUDENTS
table to Processed
, I'm doing these two operations in a transaction block in case of any failure to revert back the changes.
This is not working with JdbcPagingItemReader
as some records are left unprocessed at the end of the process.
Can someone please let me know what I'm missing here?
Reader
@Bean
@StepScope
public ItemReader<SourceData> reader(DataSource dataSource) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "ToBeProcessed");
JdbcPagingItemReader<SourceData> reader = new JdbcPagingItemReader<>();
reader.setName("Oracle_RCP");
reader.setDataSource(dataSource);
reader.setRowMapper(SourceData.rowMapper());
reader.setParameterValues(parameterValues);
reader.setPageSize(100);
reader.setQueryProvider(getQueryProvider(new OraclePagingQueryProvider(), "SELECT ID, NAME, CREATED_TIME", "FROM STUDENTS", "WHERE STATUS = :status", CREATED_TIME, Order.ASCENDING));
reader.setSaveState(false);
try {
reader.afterPropertiesSet();
} catch (Exception e) {
log.error(e.getMessage(), e.getStackTrace());
}
return reader;
}
public PagingQueryProvider getQueryProvider(AbstractSqlPagingQueryProvider queryProvider, String select, String from, String where, String sortKey, Order order) {
queryProvider.setSelectClause(select);
queryProvider.setFromClause(from);
if (where != null) {
queryProvider.setWhereClause(where);
}
Map<String, Order> sortConfiguration = new HashMap<>();
sortConfiguration.put(sortKey, order);
queryProvider.setSortKeys(sortConfiguration);
return queryProvider;
}
Processor
@Bean
@StepScope
public ItemProcessor<SourceData, OutData> processor(
@Value("#{jobParameters['processDate']}") String processDate) {
return new CustomItemProcessor(processDate);
}
Writer
@SuppressWarnings("unchecked")
@Bean
public ItemWriter<OutData> writer(Utils utils) {
return OutDataList -> utils.batchOperation((List<OutData>) OutDataList, chunk);
}
Job, Step and TaskExecutor
@Bean("MainJob")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Job mainJob(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("mainJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("step")
.<SourceData, OutData> chunk(chunk)
.reader(reader(null))
.processor(processor(null))
.writer(writer(null))
.taskExecutor(taskExecutor())
.build();
}
@Bean
@StepScope
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(50);
executor.setCorePoolSize(25);
executor.setQueueCapacity(25);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-Executor");
return executor;
}
batchOperation
method used in the Writer to insert the processed records in the PROCESSED_STUDENTS
table and update the processed records in the STUDENTS
source table:
public void batchOperation(List<OutData> outDataList, int batchSize) {
try (
Connection con = jdbcTemplate.getDataSource().getConnection();
PreparedStatement psInsert = con.prepareStatement("INSERT INTO PROCESSED_STUDENTS (ID, NAME) VALUES (?, ?)");
PreparedStatement psUpdate = con.prepareStatement("UPDATE STUDENTS SET STATUS = 'Processed' WHERE ID = ?");) {
// Starting transaction block
con.setAutoCommit(false);
int i = 0;
for (OutData argument : outDataList) {
psInsert.setLong(1, argument.getId());
psInsert.setString(2, argument.getName());
psUpdate.setLong(1, argument.getId());
psInsert.addBatch();
psUpdate.addBatch();
i++;
if (i % batchSize == 0) {
psInsert.executeBatch();
psUpdate.executeBatch();
}
}
// Executing remaining batch if total record count is an odd number
psInsert.executeBatch();
psUpdate.executeBatch();
// End transaction block, commit changes
con.commit();
// Setting it back to default true
con.setAutoCommit(true);
} catch (Exception e) {
log.error(e.getMessage());
}
}
You can't use the JdbcPagingItemReader
in this case because you are changing the parameter used as a filter (status). The pagination won't work.
Simply use JdbcCursorItemReader
instead.