Search code examples
javaspringmultithreadingspring-batchbatch-processing

Spring Boot Batch - Stop and Start a multithreaded step with CompositeItemWriter


I am trying to stop and start a multithreaded step through Scheduler. But I am getting exception as

Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.

If I understand correctly we wont be able to restart an multithreaded step. But I am not restarting. I stop the job by stepExecution.setTerminateOnly() through ChunkListener() and trying to start this by jobLauncher.run() in a scheduler. Here is my codes;

public class BatchConfiguration {
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
        
    @Autowired
    public DataSource dataSource;
    
        @Bean
        public TaskExecutor taskExecutor(){
            SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
            asyncTaskExecutor.setConcurrencyLimit(5);
            return asyncTaskExecutor;
        }
    
        @Bean
        public Job userPurgeJob() {
            return jobBuilderFactory.get("userPurgeJob")
                    .start(userPurgeStep())
                    .listener(new JobLoggerListener())
                    .build();
        }   
            
        @Bean
        public Step userPurgeStep() {
            return stepBuilderFactory.get("userPurgeStep")
                    .<UserInfo, UserInfo> chunk(10)
                    .reader(userPurgeReader())
                    .writer(compositePurgeWriter())
                    .listener(new StopListener())
                    .taskExecutor(taskExecutor())
                    .build();
        }
        
        
        @Bean
        @StepScope
        public JdbcCursorItemReader<UserInfo> userPurgeReader(){
            JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
            reader.setDataSource(dataSource);
            reader.setSql("SELECT user_id, user_status "
                    + "FROM db3.user_purge "
                    + "WHERE user_status = 'I' "
                    + "AND purge_status = 'N'");
            reader.setRowMapper(new SoftDeleteMapper());
      
            return reader;
        }
        
        @Bean
        public CompositeItemWriter<UserInfo> compositePurgeWriter() {
            CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
            return compositeItemWriter;
        }
        
            
        @Bean
        @StepScope
        public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.userinfo "
                    + "SET user_status = :userStatus, "
                        + "updated = NOW() "
                        + "WHERE user_id = :userId");
            writer.setDataSource(dataSource);
            
            return writer;
        }
        
        @Bean
        @StepScope
        public JdbcBatchItemWriter<UserInfo> delTableWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
            writer.setDataSource(dataSource);
            return writer;
        }
}

StopListener.java This ChunkListner class implementation is used to terminate the execution any time other than between 10pm and 6am

public class StopListener implements ChunkListener{
    private StepExecution stepExecution;
    
    @Autowired
    AppUtils appUtils;
    
    @Override
    public void beforeChunk(ChunkContext context) {
        
    }
    
    @Override
    public void afterChunk(ChunkContext context) {
        if (stopJob()) {
            this.stepExecution.setTerminateOnly();            
        }       
    }
    
    @Override
    public void afterChunkError(ChunkContext context) {
                
    }

    //Check the time between 10pm and 6am
    private boolean terminateJob() {
        Date date = new Date();
        Calendar calendar = GregorianCalendar.getInstance(); 
        calendar.setTime(date); 
        calendar.get(Calendar.HOUR_OF_DAY);
        
        if(calendar.get(Calendar.HOUR_OF_DAY) >= 6 
                && calendar.get(Calendar.HOUR_OF_DAY) < 22) {           
            return true;
        }else {
            return false;
        }
    }

}

And finally my scheduler method in application class. I am using CommandLneRunner to accept arguments.

@SpringBootApplication
@EnableScheduling
public class UserPurgeBatchApplication implements CommandLineRunner{
    static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private ApplicationContext context;
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private JobOperator jobOperator;
    
    private String jobName;
    private JobParameters jobParameters;
    private String inputFile;
    private String usertype;
    private boolean jobStatus = false;
    private String completionStatus;    

    public static void main(String[] args) throws Exception{
        SpringApplication.run(UserPurgeBatchApplication.class, args);           
        
    }
    
    @Override
    public void run(String... args) throws Exception {
        this.jobName = args[0];
        
        this.jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
                
        
        LOG.info("||| Launching the JOB: " + jobName);  
        this.completionStatus  = jobSelector(jobName, jobParameters).getExitCode();
        LOG.info(">>> JOB completed with status: " + this.completionStatus);
    }


    public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {        
        Job job = this.context.getBean(jobName, Job.class);
        
        try {
            return this.jobLauncher.run(job,  jobParameters).getExitStatus();
        } catch (JobExecutionAlreadyRunningException | 
                JobRestartException | 
                JobInstanceAlreadyCompleteException | 
                JobParametersInvalidException e) {
            
            e.printStackTrace();
        }
        
        return new ExitStatus("FAILED");
    }
    
    
    @Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
    public void batchStartScheduler() {
        LOG.info("---Beginning of batchScheduler()---");
        
        Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
        String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
        Job job = this.context.getBean(jobName, Job.class);
        
        if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
            if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED) 
                    || jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
                    
                try {
                LOG.info("|||Starting the Job...");
                    this.jobParameters = new JobParametersBuilder(jobParameters)
                            .addLong("time", System.currentTimeMillis())
                            .toJobParameters();
                    
                    this.jobLauncher.run(job,  jobParameters);
                } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                        | JobParametersInvalidException e) {
                    e.printStackTrace();
                }
            }
        }else {
            LOG.info("Scheduler not executed!!");           
        }
                
        LOG.info("---End of batchScheduler()---");
    }
    
}

There are some confusions. Will run method always try to restart the previous executions if it is failed? Because I could see this was still restarting and that can be the reason for this. I tried to provide new JobParameter hoping it will just launch it again. I hope my stopping method from ChunkListener is ok. But somehow I want to start this job again from Scheduler and I definitely need a multihreaded step. I also hope a CompositeWriter in a Multithreaded step is also fine. A help would be greatly appreciated. Thanks in advance!

Update : Finally I could make it work by adding reader.setVerifyCursorPosition(false). But I think I need to use thread safe Reader as suggested by Mahmoud Ben Hassine. So I am trying to use JdbcPagingItemReader but getting error as "sortKey must be specified". I think I have specified it but not sure it is correct. Here is my JdbcPagingItemReader

@Bean
public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
    JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();

    pagingItemReader.setDataSource(dataSource);
    pagingItemReader.setFetchSize(3);
    pagingItemReader.setRowMapper(new SoftDeleteMapper());

    MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
    mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
    mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
    mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
                                                + "AND purge_status = 'N'");

    Map<String, Order> orderByKeys = new HashMap<>();
    orderByKeys.put("user_id", Order.ASCENDING);

    mySqlPagingQueryProvider.setSortKeys(orderByKeys);
    pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);

    return pagingItemReader;
}

My Updated Step

@Bean
public Step userPurgeStep() {
    return stepBuilderFactory.get("userPurgeStep")
            .<UserInfo, UserInfo> chunk(10)
            .reader(jdbcPagingItemReader())
            .writer(compositeSoftDelWriter())
            .listener(new StopListener())
            .taskExecutor(taskExecutor())
            .build();
}

Solution

  • Multi-threading is not compatible with restarts. As mentioned in the Javadoc, you should set saveState to false if you use the JdbcCursorItemReader in a multi-threaded step.

    Moreover, the JdbcCursorItemReader is not thread-safe as it wraps a ResultSet object which is not thread safe and also because it inherits from AbstractItemCountingItemStreamItemReader which is not thread safe neither. So using it in a multi-threaded step is incorrect. This is actually the cause of your issue Unexpected cursor position change. Concurrent threads are modifying the cursor position inadvertently.

    You need to synchronize access to the reader by wrapping it in a SynchronizedIteamStreamReader or use a JdbcPagingItemReader which is thread safe.

    EDIT: Add example with JdbcPagingItemReader

    Here is a self-contained docker-based sample:

    import java.util.HashMap;
    import java.util.Map;
    
    import javax.sql.DataSource;
    
    import com.mysql.cj.jdbc.MysqlDataSource;
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.ClassRule;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.testcontainers.containers.MySQLContainer;
    import org.testcontainers.utility.DockerImageName;
    
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.database.JdbcPagingItemReader;
    import org.springframework.batch.item.database.Order;
    import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ByteArrayResource;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;
    import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @ContextConfiguration
    public class SO67614305 {
    
        private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");
    
        @ClassRule
        public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
        
        @Autowired
        private DataSource dataSource;
        @Autowired
        private JobLauncher jobLauncher;
        @Autowired
        private Job job;
        
        @Before
        public void setUp() {
            String schema = "/org/springframework/batch/core/schema-mysql.sql";
            String data = // the script is inline here to have a self contained example
                    "create table person (ID int not null primary key, name varchar(20));" +
                    "insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
                    "insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
            ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
            databasePopulator.addScript(new ClassPathResource(schema));
            databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
            databasePopulator.execute(this.dataSource);
        }
    
        @Test
        public void testJob() throws Exception {
            // given
            JobParameters jobParameters = new JobParameters();
            
            // when
            JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);
    
            // then
            Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
        }
    
        @Configuration
        @EnableBatchProcessing
        static class TestConfiguration {
    
            @Bean
            public DataSource dataSource() throws Exception {
                MysqlDataSource datasource = new MysqlDataSource();
                datasource.setURL(mysql.getJdbcUrl());
                datasource.setUser(mysql.getUsername());
                datasource.setPassword(mysql.getPassword());
                datasource.setUseSSL(false);
                return datasource;
            }
    
            @Bean
            public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
                MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
                mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
                mySqlPagingQueryProvider.setFromClause("FROM person");
                Map<String, Order> orderByKeys = new HashMap<>();
                orderByKeys.put("id", Order.DESCENDING);
                mySqlPagingQueryProvider.setSortKeys(orderByKeys);
    
                JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
                pagingItemReader.setDataSource(dataSource());
                pagingItemReader.setFetchSize(2);
                pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
                pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
                return pagingItemReader;
            }
    
            @Bean
            public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
                return jobs.get("job")
                        .start(steps.get("step").chunk(2)
                                .reader(jdbcPagingItemReader())
                                .writer(items -> items.forEach(System.out::println))
                                .build())
                        .build();
            }
    
            static class Person {
                int id;
                String name;
    
                public int getId() {
                    return id;
                }
    
                public void setId(int id) {
                    this.id = id;
                }
    
                public String getName() {
                    return name;
                }
    
                public void setName(String name) {
                    this.name = name;
                }
    
                @Override
                public String toString() {
                    return "Person{" +
                            "id=" + id +
                            ", name='" + name + '\'' +
                            '}';
                }
            }
    
        }
    }
    

    This prints items in the descending order as expected without complaining about the missing sort key:

    Person{id=4, name='foo4'}
    Person{id=3, name='foo3'}
    Person{id=2, name='foo2'}
    Person{id=1, name='foo1'}