Search code examples
springspring-batch

Spring Batch: How do I get the execution context into my ItemListenerSupport class?


Hopefully this is an easy one for someone: I want my Spring Batch app to make heavy use of the ItemListenerSupport "onError" methods to track all of the errors encountered in my job, and collect them all in an email at the end of the job. But isn't the only way to pass data in between steps in the StepExecution (to be promoted to the JobExecution later)? How do I get access to the StepExecution from an ItemListener? This may not be possible, because I swear I can't find an example of it.

A link to an example, or any kind of explanation, would be much appreciated!

UPDATE: Here's a link to a gist with my complete current configuration: https://gist.github.com/cnickyd/cbfc6dd39bc2e266a5d2153678b7dc1c


Solution

  • You don't need a job scoped bean for that. What you can do is making your listener implement ItemStream and use the execution context to store what you need from your "onError" methods. Here is a quick example:

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.springframework.batch.core.ItemProcessListener;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.listener.JobExecutionListenerSupport;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemStream;
    import org.springframework.batch.item.ItemStreamException;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJobConfiguration {
    
        @Bean
        public ItemProcessor<Integer, Integer> itemProcessor() {
            return item -> {
                if (item % 2 != 0) {
                    throw new Exception("no odd numbers here!");
                }
                return item;
            };
        }
        
        @Bean
        public MyItemProcessListener itemListenerSupport() {
            return new MyItemProcessListener();
        }
    
        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
            return jobs.get("job")
                    .start(steps.get("step")
                            .<Integer, Integer>chunk(5)
                            .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
                            .processor(itemProcessor())
                            .writer(items -> items.forEach(System.out::println))
                            .faultTolerant()
                            .skip(Exception.class)
                            .skipLimit(10)
                            .listener(itemListenerSupport())
                            .stream(itemListenerSupport())
                            .build())
                    .listener(new MyJobListener())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
        
        class MyItemProcessListener implements ItemProcessListener<Integer, Integer>, ItemStream {
    
            private ExecutionContext executionContext;
    
            @Override
            public void open(ExecutionContext executionContext) throws ItemStreamException {
                this.executionContext = executionContext;
                this.executionContext.put("errorItems", new ArrayList<Integer>());
            }
    
            @Override
            public void update(ExecutionContext executionContext) throws ItemStreamException {
            }
    
            @Override
            public void close() throws ItemStreamException {
            }
    
            @Override
            public void beforeProcess(Integer item) {
                
            }
    
            @Override
            public void afterProcess(Integer item, Integer result) {
    
            }
    
            @Override
            @SuppressWarnings("unchecked")
            public void onProcessError(Integer item, Exception e) {
                List<Integer> errorItems = (List<Integer>) executionContext.get("errorItems");
                errorItems.add(item);
                executionContext.put("errorItems", errorItems);
            }
        }
        
        class MyJobListener extends JobExecutionListenerSupport {
            @Override
            public void afterJob(JobExecution jobExecution) {
                // we know there is a single step here. But in a real world scenario, you would get the execution context of the step you need, or use an ExecutionContextPromotionListener and promote the key in the job execution context
                ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
                System.out.println("Sending email with error items: " + executionContext.get("errorItems"));
            }
        }
    
    }
    

    This prints:

    2
    4
    6
    8
    10
    Sending email with error items: [1, 3, 5, 7, 9]