Spring Boot Batch - Stop and Start a multithreaded step with CompositeItemWriter

I am trying to stop and start a multithreaded step through Scheduler. But I am getting exception as

Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.

If I understand correctly we wont be able to restart an multithreaded step. But I am not restarting. I stop the job by stepExecution.setTerminateOnly() through ChunkListener() and trying to start this by in a scheduler. Here is my codes;

public class BatchConfiguration {
    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;
    public DataSource dataSource;
        public TaskExecutor taskExecutor(){
            SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
            return asyncTaskExecutor;
        public Job userPurgeJob() {
            return jobBuilderFactory.get("userPurgeJob")
                    .listener(new JobLoggerListener())
        public Step userPurgeStep() {
            return stepBuilderFactory.get("userPurgeStep")
                    .<UserInfo, UserInfo> chunk(10)
                    .listener(new StopListener())
        public JdbcCursorItemReader<UserInfo> userPurgeReader(){
            JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
            reader.setSql("SELECT user_id, user_status "
                    + "FROM db3.user_purge "
                    + "WHERE user_status = 'I' "
                    + "AND purge_status = 'N'");
            reader.setRowMapper(new SoftDeleteMapper());
            return reader;
        public CompositeItemWriter<UserInfo> compositePurgeWriter() {
            CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
            return compositeItemWriter;
        public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.userinfo "
                    + "SET user_status = :userStatus, "
                        + "updated = NOW() "
                        + "WHERE user_id = :userId");
            return writer;
        public JdbcBatchItemWriter<UserInfo> delTableWriter() {
            JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
            writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
            writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
            return writer;
} This ChunkListner class implementation is used to terminate the execution any time other than between 10pm and 6am

public class StopListener implements ChunkListener{
    private StepExecution stepExecution;
    AppUtils appUtils;
    public void beforeChunk(ChunkContext context) {
    public void afterChunk(ChunkContext context) {
        if (stopJob()) {
    public void afterChunkError(ChunkContext context) {

    //Check the time between 10pm and 6am
    private boolean terminateJob() {
        Date date = new Date();
        Calendar calendar = GregorianCalendar.getInstance(); 
        if(calendar.get(Calendar.HOUR_OF_DAY) >= 6 
                && calendar.get(Calendar.HOUR_OF_DAY) < 22) {           
            return true;
        }else {
            return false;


And finally my scheduler method in application class. I am using CommandLneRunner to accept arguments.

public class UserPurgeBatchApplication implements CommandLineRunner{
    static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
    private JobLauncher jobLauncher;
    private ApplicationContext context;
    private JobRepository jobRepository;
    private JobOperator jobOperator;
    private String jobName;
    private JobParameters jobParameters;
    private String inputFile;
    private String usertype;
    private boolean jobStatus = false;
    private String completionStatus;    

    public static void main(String[] args) throws Exception{, args);           
    public void run(String... args) throws Exception {
        this.jobName = args[0];
        this.jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
       "||| Launching the JOB: " + jobName);  
        this.completionStatus  = jobSelector(jobName, jobParameters).getExitCode();">>> JOB completed with status: " + this.completionStatus);

    public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {        
        Job job = this.context.getBean(jobName, Job.class);
        try {
            return,  jobParameters).getExitStatus();
        } catch (JobExecutionAlreadyRunningException | 
                JobRestartException | 
                JobInstanceAlreadyCompleteException | 
                JobParametersInvalidException e) {
        return new ExitStatus("FAILED");
    @Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
    public void batchStartScheduler() {"---Beginning of batchScheduler()---");
        Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
        String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
        Job job = this.context.getBean(jobName, Job.class);
        if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
                    || jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
                try {
      "|||Starting the Job...");
                    this.jobParameters = new JobParametersBuilder(jobParameters)
                            .addLong("time", System.currentTimeMillis())
          ,  jobParameters);
                } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
                        | JobParametersInvalidException e) {
        }else {
  "Scheduler not executed!!");           
               "---End of batchScheduler()---");

There are some confusions. Will run method always try to restart the previous executions if it is failed? Because I could see this was still restarting and that can be the reason for this. I tried to provide new JobParameter hoping it will just launch it again. I hope my stopping method from ChunkListener is ok. But somehow I want to start this job again from Scheduler and I definitely need a multihreaded step. I also hope a CompositeWriter in a Multithreaded step is also fine. A help would be greatly appreciated. Thanks in advance!

Update : Finally I could make it work by adding reader.setVerifyCursorPosition(false). But I think I need to use thread safe Reader as suggested by Mahmoud Ben Hassine. So I am trying to use JdbcPagingItemReader but getting error as "sortKey must be specified". I think I have specified it but not sure it is correct. Here is my JdbcPagingItemReader

public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
    JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();

    pagingItemReader.setRowMapper(new SoftDeleteMapper());

    MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
    mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
    mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
    mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
                                                + "AND purge_status = 'N'");

    Map<String, Order> orderByKeys = new HashMap<>();
    orderByKeys.put("user_id", Order.ASCENDING);


    return pagingItemReader;

My Updated Step

public Step userPurgeStep() {
    return stepBuilderFactory.get("userPurgeStep")
            .<UserInfo, UserInfo> chunk(10)
            .listener(new StopListener())


  • Multi-threading is not compatible with restarts. As mentioned in the Javadoc, you should set saveState to false if you use the JdbcCursorItemReader in a multi-threaded step.

    Moreover, the JdbcCursorItemReader is not thread-safe as it wraps a ResultSet object which is not thread safe and also because it inherits from AbstractItemCountingItemStreamItemReader which is not thread safe neither. So using it in a multi-threaded step is incorrect. This is actually the cause of your issue Unexpected cursor position change. Concurrent threads are modifying the cursor position inadvertently.

    You need to synchronize access to the reader by wrapping it in a SynchronizedIteamStreamReader or use a JdbcPagingItemReader which is thread safe.

    EDIT: Add example with JdbcPagingItemReader

    Here is a self-contained docker-based sample:

    import java.util.HashMap;
    import java.util.Map;
    import javax.sql.DataSource;
    import com.mysql.cj.jdbc.MysqlDataSource;
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.ClassRule;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.testcontainers.containers.MySQLContainer;
    import org.testcontainers.utility.DockerImageName;
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.database.JdbcPagingItemReader;
    import org.springframework.batch.item.database.Order;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;
    import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    public class SO67614305 {
        private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");
        public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
        private DataSource dataSource;
        private JobLauncher jobLauncher;
        private Job job;
        public void setUp() {
            String schema = "/org/springframework/batch/core/schema-mysql.sql";
            String data = // the script is inline here to have a self contained example
                    "create table person (ID int not null primary key, name varchar(20));" +
                    "insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
                    "insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
            ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
            databasePopulator.addScript(new ClassPathResource(schema));
            databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
        public void testJob() throws Exception {
            // given
            JobParameters jobParameters = new JobParameters();
            // when
            JobExecution jobExecution =, jobParameters);
            // then
            Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
        static class TestConfiguration {
            public DataSource dataSource() throws Exception {
                MysqlDataSource datasource = new MysqlDataSource();
                return datasource;
            public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
                MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
                mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
                mySqlPagingQueryProvider.setFromClause("FROM person");
                Map<String, Order> orderByKeys = new HashMap<>();
                orderByKeys.put("id", Order.DESCENDING);
                JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
                pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
                return pagingItemReader;
            public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
                return jobs.get("job")
                                .writer(items -> items.forEach(System.out::println))
            static class Person {
                int id;
                String name;
                public int getId() {
                    return id;
                public void setId(int id) {
           = id;
                public String getName() {
                    return name;
                public void setName(String name) {
           = name;
                public String toString() {
                    return "Person{" +
                            "id=" + id +
                            ", name='" + name + '\'' +

    This prints items in the descending order as expected without complaining about the missing sort key:

    Person{id=4, name='foo4'}
    Person{id=3, name='foo3'}
    Person{id=2, name='foo2'}
    Person{id=1, name='foo1'}