Search code examples
spring-batchjava-17

Spring Batch Job stop using JobOperator throws NoSuchJobException


My controller wants to stop a currently running job, and it works, but also throws a NoSuchJobException, which is frustrating.

Documentation Spring

Here's the Spring doc on stopping a Job.

I'm using Java 17, Spring Boot 3.1.

Error

Here you can see the Job start and then stop with the error.

2023-07-15T09:20:53.036-05:00  INFO 4450 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=importSpeakersJob]] launched with the following parameters: [{}]
2023-07-15T09:20:53.060-05:00  INFO 4450 --- [nio-8080-exec-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepSpeakers]
2023-07-15T09:20:53.674-05:00  INFO 4450 --- [nio-8080-exec-1] n.s.B.E.ExcelRowItemProcessor            : Processing: d&b Y-SUB native
2023-07-15T09:20:54.834-05:00  INFO 4450 --- [nio-8080-exec-1] n.s.B.E.ExcelRowItemProcessor            : Persisting transfer function for d&b audiotechnik Y-SUB Y-SUB
2023-07-15T09:20:55.044-05:00  INFO 4450 --- [nio-8080-exec-1] n.subalignercss.Services.BucketService   : Object ExcelSpeakers/SpeakerToImport/JBL.VTX G28.VTX G28.xlsx was deleted from subaligner
2023-07-15T09:20:55.045-05:00  INFO 4450 --- [nio-8080-exec-1] n.s.B.E.ExcelRowItemProcessor            : Processing: d&b audiotechnik J-SUB J-SUB, INFRA Off, HCD On
2023-07-15T09:21:01.445-05:00  WARN 4450 --- [nio-8080-exec-2] o.s.b.c.l.support.SimpleJobOperator      : Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called

org.springframework.batch.core.launch.NoSuchJobException: No job configuration with the name [importSpeakersJob] was registered
    at org.springframework.batch.core.configuration.support.MapJobRegistry.getJob(MapJobRegistry.java:68) ~[spring-batch-core-5.0.2.jar:5.0.2]

Controller

I'm using the controlJob method to stop the job.

import lombok.extern.log4j.Log4j2;
import nathanlively.subalignercss.DTO.AdminDTO;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.support.RedirectAttributes;
import org.springframework.web.servlet.view.RedirectView;

import java.util.List;
import java.util.Set;

@Controller
@RequestMapping("/admin")
@Log4j2
public class AdminController {
    private final Job importSpeakersJob;
    private final Job importLApresets;
    private final Job importMatlabResultsJob;
    private final JobLauncher jobLauncher;
    private final JobExplorer jobExplorer;
    private final JobOperator jobOperator;

    public AdminController(@Qualifier("importSpeakersJob") Job importSpeakersJob,
                           @Qualifier("importLApresets") Job importLApresets,
                           @Qualifier("importMatlabResultsJob") Job importMatlabResultsJob,
                           JobLauncher jobLauncher, JobExplorer jobExplorer, JobOperator jobOperator) {
        this.importSpeakersJob = importSpeakersJob;
        this.importLApresets = importLApresets;
        this.importMatlabResultsJob = importMatlabResultsJob;
        this.jobLauncher = jobLauncher;
        this.jobExplorer = jobExplorer;
        this.jobOperator = jobOperator;
    }

    @GetMapping
    public String showAdminPage(@ModelAttribute("adminDTO") AdminDTO adminDTO, Model model) {
        if (adminDTO == null) {
            adminDTO = AdminDTO.builder().build();
        }
        try {
            List<JobInstance> jobInstances = null;
            if (!adminDTO.getJobName().isEmpty()) {
                jobInstances = jobExplorer.getJobInstances(adminDTO.getJobName(), 0, 1);
                if (!jobInstances.isEmpty()) {
                    List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstances.get(0));
                    JobExecution lastExecution = jobExecutions.get(jobExecutions.size() - 1);
                    String jobName = lastExecution.getJobInstance().getJobName();
                    BatchStatus status = lastExecution.getStatus();

                    adminDTO.setJobName(jobName);
                    adminDTO.setStatus(status);
                }
            }
        } catch (IndexOutOfBoundsException e) {
            adminDTO.setStatus(BatchStatus.UNKNOWN);
        }
        model.addAttribute("adminDTO", adminDTO);
        return "admin";
    }

    @PostMapping("/controlJob")
    public RedirectView controlJob(@ModelAttribute("adminDTO") AdminDTO adminDTO, RedirectAttributes redirectAttributes, @RequestParam("submit") String submit) {
        try {
            String selectedJob = adminDTO.getSelectedJob();
            JobParameters jobParameters;
            switch (submit) {
                case "start", "restart" -> {
                    jobParameters = "start".equals(submit) ?
                            new JobParametersBuilder().toJobParameters() :
                            new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();

                    JobExecution jobExecution = switch (selectedJob) {
                        case "importSpeakersJob" -> jobLauncher.run(importSpeakersJob, jobParameters);
                        case "importLApresets" -> jobLauncher.run(importLApresets, jobParameters);
                        case "importMatlabResultsJob" -> jobLauncher.run(importMatlabResultsJob, jobParameters);
                        default -> throw new IllegalArgumentException("Invalid job name: " + selectedJob);
                    };

                    log.info("Job " + selectedJob + " started with status : " + jobExecution.getStatus());
                    redirectAttributes.addFlashAttribute("success", "Job " + selectedJob + " started with status : " + jobExecution.getStatus() + ". I'll text you when it's done. 📲");
                }
                case "stop" -> {
                    try {
                        Set<Long> runningExecutions = jobOperator.getRunningExecutions(selectedJob);
                        if (!runningExecutions.isEmpty()) {
                            jobOperator.stop(runningExecutions.iterator().next());
                            redirectAttributes.addFlashAttribute("success", "Job " + selectedJob + " has been requested to stop.");
                        } else {
                            redirectAttributes.addFlashAttribute("error", "Job " + selectedJob + " is not currently running.");
                        }
                    } catch (NoSuchJobException nsje) {
                        redirectAttributes.addFlashAttribute("error", "No such job: " + selectedJob);
                    }
                }
                default -> throw new IllegalArgumentException("Invalid action: " + submit);
            }
        } catch (Exception e) {
            log.error("Job control operation failed.", e);
            redirectAttributes.addFlashAttribute("error", "Job control operation failed: " + e.getMessage());
        }

        redirectAttributes.addFlashAttribute("adminDTO", adminDTO);

        return new RedirectView("/admin");
    }
}

Batch Config

Here I'm configuring each of the jobs.

import com.google.cloud.storage.Blob;
import jakarta.persistence.EntityManagerFactory;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import nathanlively.subalignercss.BatchProcessing.BatchDTO.TableSectionFlattened;
import nathanlively.subalignercss.BatchProcessing.BatchDTO.TableSections;
import nathanlively.subalignercss.BatchProcessing.ExcelSpeakers.ExcelRow;
import nathanlively.subalignercss.BatchProcessing.ExcelSpeakers.ExcelRowItemProcessor;
import nathanlively.subalignercss.BatchProcessing.ExcelSpeakers.NamedByteArrayResource;
import nathanlively.subalignercss.BatchProcessing.MatlabResults.ResultRow;
import nathanlively.subalignercss.BatchProcessing.MatlabResults.ResultRowItemProcessor;
import nathanlively.subalignercss.Models.Enums.SortType;
import nathanlively.subalignercss.Models.PreAlignment;
import nathanlively.subalignercss.Models.Speaker;
import nathanlively.subalignercss.Services.BucketService;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.extensions.excel.RowMapper;
import org.springframework.batch.extensions.excel.mapping.BeanWrapperRowMapper;
import org.springframework.batch.extensions.excel.poi.PoiItemReader;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

@Configuration
@AllArgsConstructor
@Log4j2
public class BatchConfiguration {

    @Autowired
    private EntityManagerFactory entityManagerFactory;

    @Autowired
    private JobCompletionNotificationListener listener;

    private final BucketService bucketService;

    @Bean(name = "importLApresets")
    public Job importLApresets(JobRepository jobRepository, Step flattenJsonStep) {
        return new JobBuilder("importLApresets", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(flattenJsonStep)
                .listener(listener)
                .build();
    }

    @Bean
    public Step flattenJsonStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                                FlatFileItemWriter<List<TableSectionFlattened>> csvWriter) {
        return new StepBuilder("flattenJsonStep", jobRepository)
                .<TableSections, List<TableSectionFlattened>>chunk(1, transactionManager)
                .reader(readUnflattenedJson())
                .processor(flattenJson())
                .writer(csvWriter)
                .build();

    }

    @Bean
    public JsonItemReader<TableSections> readUnflattenedJson() {
        return new JsonItemReaderBuilder<TableSections>()
                .name("readUnflattenedJson")
                .resource(new ClassPathResource("static/json/la.march2023.json"))
                .jsonObjectReader(new JacksonJsonObjectReader<>(TableSections.class))
                .build();
    }

    @Bean
    public JsonFlattenerItemProcessor flattenJson() {
        return new JsonFlattenerItemProcessor();
    }

    @Bean
    public FlatFileItemWriter<List<TableSectionFlattened>> csvWriter() {
        return new FlatFileItemWriterBuilder<List<TableSectionFlattened>>()
                .name("csvItemWriter")
                .resource(new FileSystemResource("la.march2023.csv"))
                .lineAggregator(new DelimitedLineAggregator<>() {
                    {
                        setDelimiter(System.getProperty("line.separator"));
                        setFieldExtractor(list -> {
                            List<String> flattenedStrings = new ArrayList<>();
                            for (TableSectionFlattened flattened : list) {
                                flattenedStrings.add(
                                        "\"" + flattened.getPreset() + "\","
                                                + "\"" + flattened.getDsp() + "\","
                                                + "\"" + flattened.getSource1() + "\","
                                                + "\"" + flattened.getSource2() + "\","
                                                + "\"" + flattened.getSource3() + "\""
                                );
                            }
                            return flattenedStrings.toArray();
                        });
                    }
                })
                .build();
    }





    // Import Excel speakers
    @Bean(name = "importSpeakersJob")
    public Job importSpeakersJob(JobRepository jobRepository, Step stepSpeakers) {
        return new JobBuilder("importSpeakersJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(stepSpeakers)
                .listener(listener)
                .build();
    }

    @Bean
    public Step stepSpeakers(JobRepository jobRepository, PlatformTransactionManager transactionManager, JpaItemWriter<Speaker> speakerJpaItemWriter) {
        return new StepBuilder("stepSpeakers", jobRepository)
                .<ExcelRow, Speaker>chunk(1, transactionManager)
                .reader(excelReader())
                .processor(excelRowItemProcessor())
                .writer(speakerJpaItemWriter)
                .build();
    }

    @Bean
    public MultiResourceItemReader<ExcelRow> excelReader() {
        List<Resource> resources = loadResources();
        Resource[] resourcesArray = resources.toArray(new Resource[0]);

        PoiItemReader<ExcelRow> reader = createPoiReader();

        return createMultiResourceItemReader(resourcesArray, reader);
    }

    private PoiItemReader<ExcelRow> createPoiReader() {
        PoiItemReader<ExcelRow> reader = new PoiItemReader<>();
        reader.setLinesToSkip(1); // HEADERS
        reader.setRowMapper(rowMapper());
        return reader;
    }

    private MultiResourceItemReader<ExcelRow> createMultiResourceItemReader(Resource[] resourcesArray, PoiItemReader<ExcelRow> reader) {
        MultiResourceItemReader<ExcelRow> multiResourceItemReader = new MultiResourceItemReader<>();
        multiResourceItemReader.setResources(resourcesArray);
        multiResourceItemReader.setDelegate(reader);
        return multiResourceItemReader;
    }

    private List<Resource> loadResources() {
        List<Blob> blobs = getBlobs();

        List<Resource> resources = new ArrayList<>();
        for (Blob blob : blobs) {
            String blobName = blob.getName();
            byte[] content = bucketService.downloadObjectIntoMemory(blobName);
            Path path = Paths.get(blobName);
            String fileName = path.getFileName().toString();
            Resource resource = new NamedByteArrayResource(content, fileName);
            resources.add(resource);
        }
        return resources;
    }

    private List<Blob> getBlobs() {
        List<Blob> blobs = bucketService.listAllFilesInFolder("ExcelSpeakers/SpeakerToImport/", SortType.NAME, ".xlsx");
        List<Blob> finalBlobs = bucketService.listAllFilesInFolder("ExcelSpeakers/FinalSpeakerToImport/", SortType.NAME, ".xlsx");
        blobs.addAll(finalBlobs);
        return blobs;
    }

    @Bean
    public RowMapper<ExcelRow> rowMapper() {
        BeanWrapperRowMapper<ExcelRow> rowMapper = new BeanWrapperRowMapper<>();
        rowMapper.setTargetType(ExcelRow.class);
        return rowMapper;
    }

    @Bean
    public ExcelRowItemProcessor excelRowItemProcessor() {
        return new ExcelRowItemProcessor();
    }

    @Bean
    public JpaItemWriter<Speaker> speakerJpaItemWriter() {
        return new JpaItemWriterBuilder<Speaker>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }





    // Import MATLAB results
    @Bean(name = "importMatlabResultsJob")
    public Job importMatlabResultsJob(JobRepository jobRepository, Step importMATLABResults) {
        return new JobBuilder("importMatlabResultsJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(importMATLABResults)
                .listener(listener)
                .build();
    }

    @Bean
    public Step importMATLABResults(JobRepository jobRepository, PlatformTransactionManager transactionManager, JpaItemWriter<PreAlignment> writer) {
        return new StepBuilder("importMATLABResults", jobRepository)
                .<ResultRow, PreAlignment>chunk(1, transactionManager)
                .reader(readMatlabJson())
                .processor(resultRowItemProcessor())
                .writer(writer)
                .build();
    }

    @Bean
    public JsonItemReader<ResultRow> readMatlabJson() {
        return new JsonItemReaderBuilder<ResultRow>()
                .name("readMatlabJson")
                .jsonObjectReader(new JacksonJsonObjectReader<>(ResultRow.class))
                .resource(getJsonFile())
                .build();
    }

    private Resource getJsonFile() {
        List<Blob> blobs = bucketService.listAllFilesInFolder("MATLABresults/", SortType.CREATE_TIME, ".json");
        String objectName = blobs.get(0).getName();
        byte[] content = bucketService.downloadObjectIntoMemory(objectName);
        Path path = Paths.get(objectName);
        String fileName = path.getFileName().toString();
        return new NamedByteArrayResource(content, fileName);
    }

    @Bean
    public ResultRowItemProcessor resultRowItemProcessor() {
        return new ResultRowItemProcessor();
    }


    @Bean
    public JpaItemWriter<PreAlignment> preAlignmentJpaItemWriter() {
        return new JpaItemWriterBuilder<PreAlignment>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    public PreAlignmentItemProcessor processor() {
        return new PreAlignmentItemProcessor();
    }

}

Solution

  • By default , the JobRegistry used by the JobOperator does not register any jobs to it and so it cannot find any jobs and throws NoSuchJobException.

    You can solve this problem by defining JobRegistryBeanPostProcessor :

    @Configuration
    public class BatchConfiguration {
    
       @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    } 
    }
    

    P.S. Not sure why the framework does not define it by default as I think it is quite a common thing that will be useful in most applications.