Search code examples
javaspring-bootspring-batch

Trying to implement Process Indicator Pattern in Spring Batch


I'm struggling to find a Process Indicator Pattern sample. So far I have a source STUDENTS table where I have a STATUS field to indicate whether the record was already processed or not. I'm using a task executor for multithreading processing.

In my writer, I'm inserting the processed records to a new PROCESSED_STUDENTS table and update the status of the processed records in the source STUDENTS table to Processed, I'm doing these two operations in a transaction block in case of any failure to revert back the changes.

This is not working with JdbcPagingItemReader as some records are left unprocessed at the end of the process.

Can someone please let me know what I'm missing here?

Reader

@Bean
@StepScope
public ItemReader<SourceData> reader(DataSource dataSource) {
        
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("status", "ToBeProcessed");     
    
    JdbcPagingItemReader<SourceData> reader = new JdbcPagingItemReader<>();
    reader.setName("Oracle_RCP");
    reader.setDataSource(dataSource);
    reader.setRowMapper(SourceData.rowMapper());        
    reader.setParameterValues(parameterValues);
    reader.setPageSize(100);        
    reader.setQueryProvider(getQueryProvider(new OraclePagingQueryProvider(), "SELECT ID, NAME, CREATED_TIME", "FROM STUDENTS", "WHERE STATUS = :status", CREATED_TIME, Order.ASCENDING));
    reader.setSaveState(false);     
    
    try {
        reader.afterPropertiesSet();
    } catch (Exception e) {
        log.error(e.getMessage(), e.getStackTrace());
    }
    
    return reader;
}

public PagingQueryProvider getQueryProvider(AbstractSqlPagingQueryProvider queryProvider, String select, String from, String where, String sortKey, Order order) {
    queryProvider.setSelectClause(select);
    queryProvider.setFromClause(from);
    
    if (where != null) {
        queryProvider.setWhereClause(where);
    }
    
    Map<String, Order> sortConfiguration = new HashMap<>();
    sortConfiguration.put(sortKey, order);
    queryProvider.setSortKeys(sortConfiguration);
    
    return queryProvider;
}

Processor

@Bean
@StepScope
public ItemProcessor<SourceData, OutData> processor(
        @Value("#{jobParameters['processDate']}") String processDate) {
    return new CustomItemProcessor(processDate);        
}

Writer

@SuppressWarnings("unchecked")
@Bean
public ItemWriter<OutData> writer(Utils utils) {        
    return OutDataList -> utils.batchOperation((List<OutData>) OutDataList, chunk); 
}

Job, Step and TaskExecutor

@Bean("MainJob")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Job mainJob(JobBuilderFactory jobBuilderFactory, Step step) {        
    return jobBuilderFactory.get("mainJob")   
      .incrementer(new RunIdIncrementer())          
      .flow(step)
      .end()
      .build();
}

@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {        
    return stepBuilderFactory.get("step")               
            .<SourceData, OutData> chunk(chunk)
            .reader(reader(null))
            .processor(processor(null))
            .writer(writer(null))   
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
@StepScope
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(50);
    executor.setCorePoolSize(25);       
    executor.setQueueCapacity(25);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-Executor");
    return executor;
}

batchOperation method used in the Writer to insert the processed records in the PROCESSED_STUDENTS table and update the processed records in the STUDENTS source table:

public void batchOperation(List<OutData> outDataList, int batchSize) {
    try (
            Connection con = jdbcTemplate.getDataSource().getConnection(); 
            PreparedStatement psInsert = con.prepareStatement("INSERT INTO PROCESSED_STUDENTS (ID, NAME) VALUES (?, ?)");
            PreparedStatement psUpdate = con.prepareStatement("UPDATE STUDENTS SET STATUS = 'Processed' WHERE ID = ?");) {
        
        // Starting transaction block
        con.setAutoCommit(false);
        
        int i = 0;
        for (OutData argument : outDataList) {          
            psInsert.setLong(1, argument.getId());
            psInsert.setString(2, argument.getName());
                        
            psUpdate.setLong(1, argument.getId());
                        
            psInsert.addBatch();
            psUpdate.addBatch();
                        
            i++;
                        
            if (i % batchSize == 0) {
                psInsert.executeBatch();
                psUpdate.executeBatch();
            }               
        }
        
        // Executing remaining batch if total record count is an odd number
        psInsert.executeBatch();
        psUpdate.executeBatch();
        
        // End transaction block, commit changes
        con.commit();

        // Setting it back to default true
        con.setAutoCommit(true);
        
    } catch (Exception e) {
        log.error(e.getMessage());
    }
}

Solution

  • You can't use the JdbcPagingItemReader in this case because you are changing the parameter used as a filter (status). The pagination won't work.

    Simply use JdbcCursorItemReader instead.

    See also: (Spring Batch) Not all records are proccessed