I am trying to stop and start a multithreaded step through Scheduler. But I am getting exception as
Caused by: org.springframework.dao.InvalidDataAccessResourceUsageException: Unexpected cursor position change.
If I understand correctly we wont be able to restart an multithreaded step. But I am not restarting. I stop the job by stepExecution.setTerminateOnly() through ChunkListener() and trying to start this by jobLauncher.run() in a scheduler. Here is my codes;
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("user_purge");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean
public Job userPurgeJob() {
return jobBuilderFactory.get("userPurgeJob")
.start(userPurgeStep())
.listener(new JobLoggerListener())
.build();
}
@Bean
public Step userPurgeStep() {
return stepBuilderFactory.get("userPurgeStep")
.<UserInfo, UserInfo> chunk(10)
.reader(userPurgeReader())
.writer(compositePurgeWriter())
.listener(new StopListener())
.taskExecutor(taskExecutor())
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<UserInfo> userPurgeReader(){
JdbcCursorItemReader<UserInfo> reader = new JdbcCursorItemReader<UserInfo>();
reader.setDataSource(dataSource);
reader.setSql("SELECT user_id, user_status "
+ "FROM db3.user_purge "
+ "WHERE user_status = 'I' "
+ "AND purge_status = 'N'");
reader.setRowMapper(new SoftDeleteMapper());
return reader;
}
@Bean
public CompositeItemWriter<UserInfo> compositePurgeWriter() {
CompositeItemWriter<UserInfo> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(delMasterWriter(), delTableWriter()));
return compositeItemWriter;
}
@Bean
@StepScope
public JdbcBatchItemWriter<UserInfo> delMasterWriter() {
JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE db3.userinfo "
+ "SET user_status = :userStatus, "
+ "updated = NOW() "
+ "WHERE user_id = :userId");
writer.setDataSource(dataSource);
return writer;
}
@Bean
@StepScope
public JdbcBatchItemWriter<UserInfo> delTableWriter() {
JdbcBatchItemWriter<UserInfo> writer = new JdbcBatchItemWriter<UserInfo>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE db3.user_purge SET purge_status = 'S', del_date = NOW() WHERE user_id = :userId");
writer.setDataSource(dataSource);
return writer;
}
}
StopListener.java This ChunkListner class implementation is used to terminate the execution any time other than between 10pm and 6am
public class StopListener implements ChunkListener{
private StepExecution stepExecution;
@Autowired
AppUtils appUtils;
@Override
public void beforeChunk(ChunkContext context) {
}
@Override
public void afterChunk(ChunkContext context) {
if (stopJob()) {
this.stepExecution.setTerminateOnly();
}
}
@Override
public void afterChunkError(ChunkContext context) {
}
//Check the time between 10pm and 6am
private boolean terminateJob() {
Date date = new Date();
Calendar calendar = GregorianCalendar.getInstance();
calendar.setTime(date);
calendar.get(Calendar.HOUR_OF_DAY);
if(calendar.get(Calendar.HOUR_OF_DAY) >= 6
&& calendar.get(Calendar.HOUR_OF_DAY) < 22) {
return true;
}else {
return false;
}
}
}
And finally my scheduler method in application class. I am using CommandLneRunner to accept arguments.
@SpringBootApplication
@EnableScheduling
public class UserPurgeBatchApplication implements CommandLineRunner{
static final Logger LOG = LogManager.getLogger(UserPurgeBatchApplication.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobOperator jobOperator;
private String jobName;
private JobParameters jobParameters;
private String inputFile;
private String usertype;
private boolean jobStatus = false;
private String completionStatus;
public static void main(String[] args) throws Exception{
SpringApplication.run(UserPurgeBatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
this.jobName = args[0];
this.jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
LOG.info("||| Launching the JOB: " + jobName);
this.completionStatus = jobSelector(jobName, jobParameters).getExitCode();
LOG.info(">>> JOB completed with status: " + this.completionStatus);
}
public ExitStatus jobSelector(String jobName, JobParameters jobParameters) {
Job job = this.context.getBean(jobName, Job.class);
try {
return this.jobLauncher.run(job, jobParameters).getExitStatus();
} catch (JobExecutionAlreadyRunningException |
JobRestartException |
JobInstanceAlreadyCompleteException |
JobParametersInvalidException e) {
e.printStackTrace();
}
return new ExitStatus("FAILED");
}
@Scheduled(cron = "0 0/30 22-23,23,0-6 * * *")
public void batchStartScheduler() {
LOG.info("---Beginning of batchScheduler()---");
Long lastExecutionID = jobRepository.getLastJobExecution(jobName, jobParameters).getId();
String jobStatus = jobRepository.getLastJobExecution(jobName, jobParameters).getStatus().toString();
Job job = this.context.getBean(jobName, Job.class);
if(!jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_COMPLETED)) {
if(jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_STOPPED)
|| jobStatus.equals(ApplicationConstants.JOB_EXITSTATUS_FAILED)) {
try {
LOG.info("|||Starting the Job...");
this.jobParameters = new JobParametersBuilder(jobParameters)
.addLong("time", System.currentTimeMillis())
.toJobParameters();
this.jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
}else {
LOG.info("Scheduler not executed!!");
}
LOG.info("---End of batchScheduler()---");
}
}
There are some confusions. Will run method always try to restart the previous executions if it is failed? Because I could see this was still restarting and that can be the reason for this. I tried to provide new JobParameter hoping it will just launch it again. I hope my stopping method from ChunkListener is ok. But somehow I want to start this job again from Scheduler and I definitely need a multihreaded step. I also hope a CompositeWriter in a Multithreaded step is also fine. A help would be greatly appreciated. Thanks in advance!
Update : Finally I could make it work by adding reader.setVerifyCursorPosition(false). But I think I need to use thread safe Reader as suggested by Mahmoud Ben Hassine. So I am trying to use JdbcPagingItemReader but getting error as "sortKey must be specified". I think I have specified it but not sure it is correct. Here is my JdbcPagingItemReader
@Bean
public JdbcPagingItemReader<UserInfo> jdbcPagingItemReader() {
JdbcPagingItemReader<UserInfo> pagingItemReader = new JdbcPagingItemReader<>();
pagingItemReader.setDataSource(dataSource);
pagingItemReader.setFetchSize(3);
pagingItemReader.setRowMapper(new SoftDeleteMapper());
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("SELECT user_id, user_status");
mySqlPagingQueryProvider.setFromClause("FROM db3.user_purge");
mySqlPagingQueryProvider.setWhereClause( "WHERE user_status = 'I' "
+ "AND purge_status = 'N'");
Map<String, Order> orderByKeys = new HashMap<>();
orderByKeys.put("user_id", Order.ASCENDING);
mySqlPagingQueryProvider.setSortKeys(orderByKeys);
pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return pagingItemReader;
}
My Updated Step
@Bean
public Step userPurgeStep() {
return stepBuilderFactory.get("userPurgeStep")
.<UserInfo, UserInfo> chunk(10)
.reader(jdbcPagingItemReader())
.writer(compositeSoftDelWriter())
.listener(new StopListener())
.taskExecutor(taskExecutor())
.build();
}
Multi-threading is not compatible with restarts. As mentioned in the Javadoc, you should set saveState
to false if you use the JdbcCursorItemReader
in a multi-threaded step.
Moreover, the JdbcCursorItemReader
is not thread-safe as it wraps a ResultSet
object which is not thread safe and also because it inherits from AbstractItemCountingItemStreamItemReader
which is not thread safe neither. So using it in a multi-threaded step is incorrect. This is actually the cause of your issue Unexpected cursor position change
. Concurrent threads are modifying the cursor position inadvertently.
You need to synchronize access to the reader by wrapping it in a SynchronizedIteamStreamReader
or use a JdbcPagingItemReader
which is thread safe.
EDIT: Add example with JdbcPagingItemReader
Here is a self-contained docker-based sample:
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import com.mysql.cj.jdbc.MysqlDataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.utility.DockerImageName;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@ContextConfiguration
public class SO67614305 {
private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("mysql:8.0.24");
@ClassRule
public static MySQLContainer<?> mysql = new MySQLContainer<>(MYSQL_IMAGE);
@Autowired
private DataSource dataSource;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@Before
public void setUp() {
String schema = "/org/springframework/batch/core/schema-mysql.sql";
String data = // the script is inline here to have a self contained example
"create table person (ID int not null primary key, name varchar(20));" +
"insert into person values (1, 'foo1'); insert into person values (2, 'foo2');" +
"insert into person values (3, 'foo3'); insert into person values (4, 'foo4');";
ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
databasePopulator.addScript(new ClassPathResource(schema));
databasePopulator.addScript(new ByteArrayResource(data.getBytes()));
databasePopulator.execute(this.dataSource);
}
@Test
public void testJob() throws Exception {
// given
JobParameters jobParameters = new JobParameters();
// when
JobExecution jobExecution = this.jobLauncher.run(this.job, jobParameters);
// then
Assert.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
@Configuration
@EnableBatchProcessing
static class TestConfiguration {
@Bean
public DataSource dataSource() throws Exception {
MysqlDataSource datasource = new MysqlDataSource();
datasource.setURL(mysql.getJdbcUrl());
datasource.setUser(mysql.getUsername());
datasource.setPassword(mysql.getPassword());
datasource.setUseSSL(false);
return datasource;
}
@Bean
public JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("SELECT id, name");
mySqlPagingQueryProvider.setFromClause("FROM person");
Map<String, Order> orderByKeys = new HashMap<>();
orderByKeys.put("id", Order.DESCENDING);
mySqlPagingQueryProvider.setSortKeys(orderByKeys);
JdbcPagingItemReader<Person> pagingItemReader = new JdbcPagingItemReader<>();
pagingItemReader.setDataSource(dataSource());
pagingItemReader.setFetchSize(2);
pagingItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
pagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return pagingItemReader;
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) throws Exception {
return jobs.get("job")
.start(steps.get("step").chunk(2)
.reader(jdbcPagingItemReader())
.writer(items -> items.forEach(System.out::println))
.build())
.build();
}
static class Person {
int id;
String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
}
This prints items in the descending order as expected without complaining about the missing sort key:
Person{id=4, name='foo4'}
Person{id=3, name='foo3'}
Person{id=2, name='foo2'}
Person{id=1, name='foo1'}