I have the below step definition for a batch job:
@Autowired
RetryContextCache retryContextCache;
@Bean
public Step correctionStep(JpaTransactionManager transactionManager) {
return new StepBuilder("correction-step", jobRepository)
.<String, List<BookingInfo>>chunk(10, transactionManager)
.reader(customItemReader())
.processor(correctionProcessor())
.writer(customItemWriter)
.taskExecutor(correctionTaskExecutor())
.faultTolerant()
.retryPolicy(retryPolicy())
.backOffPolicy(exponentialBackOffPolicy())
.retryContextCache(retryContextCache)
.skipPolicy(skipPolicy())
.build();
}
I have bean definitions for RetryContext
and RetryContextCache
:
@Bean
RetryContextCache retryContextCache() {
return new MapRetryContextCache();
}
@Bean
RetryContext retryContext() {
return new RetryContextSupport(null);
}
Now, I use them in the processor
like below:
@Component
public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {
@Autowired
RetryContextCache retryContextCache;
@Autowired
RetryContext retryContext;
public List<BookingInfo> process(String bookingId) throws Exception {
List<BookingInfo> list = new ArrayList <> ();
if (retryContextCache.containsKey(bookingId)) {
list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
} else {
// fetch and populate list from database.
}
try {
// do something with the list.
} catch (Exception e) {
// modify something in the list.
retryContext.setAttribute(bookingId, list);
retryContextCache.put(bookingId, retryContext);
throw e;
}
}
}
You can see that I try to set some values in the retryContextCache
before I re-throw the exception for the retry mechanism to work.
When retry happens, it goes inside the if
condition mentioned in the above code.
But, the value of retryContextCache.get(bookingId).getAttribute(bookingId)
is always null
.
Am I setting the value in retry context incorrectly? Why is this not working?
Here is how I resolved the issue.
I wanted to save object state before the retry starts. The scope for RetryContext
starts right after the error is thrown. So, I was not able to set the value in RetryContext
.
So, I created a bean with StepScope
:
@Bean
@StepScope
public Map<String,BookingInfo> objectStateMap() {
return new ConcurrentHashMap<>();
}
And then, I autowired this hash wherever required.
Then, I wrote the modified objects to this hash before the error is re-thrown in the catch block.
Then, I created a SkipListener
for my usecase. This will capture the skipped object in case of failure. And then, it will do their respective task accordingly.
@Component
@Slf4j
public class CustomSkipListener {
@Autowired
Map<String, List<BookingInfo>> objectStateMap;
@OnSkipInRead
public void onSkipInRead(Throwable t) {
log.error("Read has skipped because of error : " + t.getMessage());
}
@OnSkipInWrite
public void onSkipInWrite(List<BookingInfo> item, Throwable t) {
log.error("Write has skipped because of error : " + t.getMessage());
}
@OnSkipInProcess
public void onSkipInProcess(String bookingId, Throwable t) {
List<BookingInfo> bookingInfos = objectStateMap.get(bookingId);
// do some tasks..
objectStateMap.remove(bookingId);
}
}
Just registered this listener
into the main job.