Hopefully this is an easy one for someone: I want my Spring Batch app to make heavy use of the ItemListenerSupport
"onError" methods to track all of the errors encountered in my job, and collect them all in an email at the end of the job. But isn't the only way to pass data in between steps in the StepExecution
(to be promoted to the JobExecution
later)? How do I get access to the StepExecution
from an ItemListener
? This may not be possible, because I swear I can't find an example of it.
A link to an example, or any kind of explanation, would be much appreciated!
UPDATE: Here's a link to a gist with my complete current configuration: https://gist.github.com/cnickyd/cbfc6dd39bc2e266a5d2153678b7dc1c
You don't need a job scoped bean for that. What you can do is making your listener implement ItemStream
and use the execution context to store what you need from your "onError" methods. Here is a quick example:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class MyJobConfiguration {
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
if (item % 2 != 0) {
throw new Exception("no odd numbers here!");
}
return item;
};
}
@Bean
public MyItemProcessListener itemListenerSupport() {
return new MyItemProcessListener();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Integer, Integer>chunk(5)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.processor(itemProcessor())
.writer(items -> items.forEach(System.out::println))
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.listener(itemListenerSupport())
.stream(itemListenerSupport())
.build())
.listener(new MyJobListener())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
class MyItemProcessListener implements ItemProcessListener<Integer, Integer>, ItemStream {
private ExecutionContext executionContext;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
this.executionContext.put("errorItems", new ArrayList<Integer>());
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void beforeProcess(Integer item) {
}
@Override
public void afterProcess(Integer item, Integer result) {
}
@Override
@SuppressWarnings("unchecked")
public void onProcessError(Integer item, Exception e) {
List<Integer> errorItems = (List<Integer>) executionContext.get("errorItems");
errorItems.add(item);
executionContext.put("errorItems", errorItems);
}
}
class MyJobListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
// we know there is a single step here. But in a real world scenario, you would get the execution context of the step you need, or use an ExecutionContextPromotionListener and promote the key in the job execution context
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
System.out.println("Sending email with error items: " + executionContext.get("errorItems"));
}
}
}
This prints:
2
4
6
8
10
Sending email with error items: [1, 3, 5, 7, 9]