Search code examples
spring-bootspring-dataspring-batchspring-batch-adminspring-batch-job-monitoring

How to make the JobRegistry contain the job information on server restart in spring batch


We have a scenario where whenever a Job is executing STARTED Status the server might crash. In this case to restart the batch I understood from here Spring Batch resume after server's failure, that status and end_time column values needs to be updated in batch_job_execution and batch_step_execution from STARTED to FAILED for JobOperator.restart(jobExecutionId) API to work.

But calling this its gives "NoSuchJobException: No job configuration with the name [] was registered" because the jobRegistry map inside JobOperator is empty.

Now how to repopulate JobRegistry with these job informations on server restart, so that we can call JobOperator.restart(jobExecutionId).

Below is my code

/** The Test properties. */
  @Autowired
  private TestProperties TestProperties;

  @Primary
  @Bean(name = "TestDataSource")

  public DataSource batchDataSource() {
    DataSource TestDBSrc = DataSourceBuilder.create().username(getUsername()).password(getPassword()).url(getUrl()).build();
    if (TestDBSrc != null && TestDBSrc instanceof HikariDataSource) {
      @SuppressWarnings("resource")
      HikariDataSource hikariDatsource = (HikariDataSource) TestDBSrc;
      hikariDatsource.setSchema(getSchema());
    }
    return TestDBSrc;
  }

  private String getSchema() {
    return TestProperties.getValue("spring.datasource.hikari.schema", "");
  }

  private String getUsername() {
    return TestProperties.getValue("spring.datasource.username", "");
  }

  private String getPassword() {
    return TestProperties.getValue("spring.datasource.password", "");
  }

  private String getUrl() {
    return TestProperties.getValue("spring.datasource.jdbc-url", "");
  }

  @Bean(name = "transactionManager")
  public JdbcTransactionManager batchTransactionManager(@Qualifier("TestDataSource") DataSource dataSource) {
    return new JdbcTransactionManager(dataSource);
  }

  @Bean(name = "TestBatchJobRepository")
  public JobRepository jobRepository(@Qualifier("TestDataSource") DataSource batchDataSource,
      @Qualifier("transactionManager") JdbcTransactionManager batchTransactionManager) throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(batchDataSource);
    factory.setTransactionManager(batchTransactionManager);
    factory.afterPropertiesSet();
    return factory.getObject();
  }

  @Bean(name = "TestBatchJobLauncher")
  public JobLauncher jobLauncher(@Qualifier("TestBatchJobRepository") JobRepository jobRepository) throws Exception {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
  }

  @Bean (name = "TestBatchJobExplorer")
  public JobExplorer jobExplorer(@Qualifier("TestDataSource") DataSource dataSource,@Qualifier("transactionManager")JdbcTransactionManager batchTransactionManager) throws Exception {
      final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
      bean.setDataSource(dataSource);
      bean.setTransactionManager(batchTransactionManager);
      bean.setTablePrefix("BATCH_");
      bean.setJdbcOperations(new JdbcTemplate(dataSource));
      bean.afterPropertiesSet();
      return bean.getObject();
  }
  
  @Bean (name ="TestBatchJobRegistry")
  public JobRegistry jobRegistry() throws Exception {
    return new MapJobRegistry();
  }
  
  @Bean
  public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(@Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry) {
      JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
      postProcessor.setJobRegistry(jobRegistry);
      return postProcessor;
  }
  
  @Bean (name = "TestBatchJobOperator")
  public JobOperator jobOperator(@Qualifier("TestBatchJobLauncher") JobLauncher jobLauncher, @Qualifier("TestBatchJobRepository") JobRepository jobRepository,
          @Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry, @Qualifier("TestBatchJobExplorer") JobExplorer jobExplorer) {
      final SimpleJobOperator jobOperator = new SimpleJobOperator();
      jobOperator.setJobLauncher(jobLauncher);
      jobOperator.setJobRepository(jobRepository);
      jobOperator.setJobRegistry(jobRegistry);
      jobOperator.setJobExplorer(jobExplorer);
      return jobOperator;
  }
}

Here since I am using the default in-memory map

@Bean (name ="TestBatchJobRegistry")
  public JobRegistry jobRegistry() throws Exception {
    return new MapJobRegistry();
  }

Its getting cleared on server restart.

Update 24/10/2023 : Screenshots of spring batch tables

On server restart, Debug on server restart

The jobregistry is empty. Tried,

 public void restart(@Qualifier("febpBatchJobExplorer") JobExplorer jobExplorer,@Qualifier("febpBatchJobRepository") JobRepository jobRepository, @Qualifier("febpBatchJobOperator") JobOperator jobOperator){
      try {
          List<String> jobNames=jobExplorer.getJobNames();
          for(String jobName:jobNames)
          {
              List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName,0,1);// this will get one latest job from the database
              if(CollectionUtils.isNotEmpty(jobInstances)){
                 JobInstance jobInstance =  jobInstances.get(0);
                 List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
                 if(CollectionUtils.isNotEmpty(jobExecutions)){
                     for(JobExecution execution: jobExecutions){
                         // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                         if(execution.getStatus().equals(BatchStatus.STARTED) || execution.getStatus().equals(BatchStatus.FAILED)){ 
                             execution.setEndTime(LocalDateTime.now());
                             execution.setStatus(BatchStatus.FAILED);                               
                             execution.setExitStatus(ExitStatus.FAILED);                               
                             jobRepository.update(execution);
                             jobOperator.restart(execution.getId());
                         }
                     }
                 }
              }
          } 
      }catch (Exception e1) {
          e1.printStackTrace();
      }
  }

But again jobOperator.restart(execution.getId()) is failing due to empty jobRegistry.

It fails at below spring batch code, enter image description here

Update 27/10/2023 : Providing a sample application to reproduce the issue, https://github.com/PSHREYASHOLLA/SamplebatchApplication.

Its a maven project, so you can call mvn install, it will create \target\SamplebatchApplication-0.0.1-SNAPSHOT.jar. Now you can start it like any springboot app(Liquibase enabled), java -jar SamplebatchApplication-0.0.1-SNAPSHOT.jar.

If you see application.properties file we are pointing it to a postgres database. All our batch configurationhttps://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/FEBPDBConfig.java.

Please start the batch process by calling rest post API , http://localhost:8080/batch/trigger-or-resume-application-batch-job JSON body { "appRestartJobExecutionId": "" } If we call this with empty appRestartJobExecutionId, the flow is like below, com.example.postgresql.controller.BatchController.triggerOrResumeApplicationBatchJobByB2E() --->com.example.postgresql.model.FebpApplicationJobServiceImpl.triggerApplicationBatchJob() --> We do JobLauncher.run(). Now this job will read 50 records from febp_emp_detail_test as part of reader and as part of writer writes the updated records to febp_emp_tax_detail_test. This is a happy flow.

Now if you call the above API and say after 5 seconds you kill the server, only partial commit will happen into febp_emp_tax_detail_test and the batch status will be in STARTED state. Now say I restart the server and call the same post API with failed job execution ID it will now call om.example.postgresql.controller.BatchController.triggerOrResumeApplicationBatchJobByB2E() --->com.example.postgresql.model.FebpApplicationJobServiceImpl.resumeApplicationBatchJob() ---> jobOperator.restart(failedBatchExecutionId); Here the restart API fails due to empty jobRegistry.

Update 02/11/2023:

After changing my job to a bean as suggested by Mahmoud Ben Hassine, I am able to restart job. But now after restart a new Job execution starts and says completed but it fails to process the data. If the job runs on a single thread it processes all the data, but if its multi thread it does not do anything. Please check https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/EmployeeTaxCalculationBatchConfig.java

Line 60, Step step = new StepBuilder("FEBP_EMP_TAX_CALCULATION_STEP", jobRepository) .<EmployeeDetail, EmployeeTaxDetail>chunk(5, transactionManager) .reader(reader.getPagingItemReader()).processor(itemProcessor).writer(itemWriter).taskExecutor(actStmntTaskExecutor()) .throttleLimit(50).build();

Here if throttleLimit is 1 it process records after restart, but if its multithreaded it does not process remaining records in my sample table.


Solution

  • That should not be the case since you have registered a JobRegistryBeanPostProcessor. This is the bean post processor that populates the job registry every time the Spring application context is (re)started.

    EDIT: Answer updated after providing a minimal example

    The issue is that the job is not declared as a bean here, it is rather the configuration class (which is obviously not of type Job) that is declared as a bean of that name here. Therefore the JobRegistryBeanPostProcessor does not find the job and does not register it in the registry.

    The Job should be registered as a bean in the application context for the JobRegistryBeanPostProcessor to post process the bean and register it in the registry.