Search code examples
javaspringspring-bootspring-batch

How to set values in Spring Batch RetryContext


I have the below step definition for a batch job:


  @Autowired
  RetryContextCache retryContextCache;

  @Bean
  public Step correctionStep(JpaTransactionManager transactionManager) {

    return new StepBuilder("correction-step", jobRepository)
        .<String, List<BookingInfo>>chunk(10, transactionManager)
        .reader(customItemReader())
        .processor(correctionProcessor())
        .writer(customItemWriter)
        .taskExecutor(correctionTaskExecutor())
        .faultTolerant()
        .retryPolicy(retryPolicy())
        .backOffPolicy(exponentialBackOffPolicy())
        .retryContextCache(retryContextCache)
        .skipPolicy(skipPolicy())
        .build();
  }

I have bean definitions for RetryContext and RetryContextCache:

  @Bean
  RetryContextCache retryContextCache() {
    return new MapRetryContextCache();
  }

  @Bean
  RetryContext retryContext() {
    return new RetryContextSupport(null);
  }

Now, I use them in the processor like below:

@Component
public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {

  @Autowired
  RetryContextCache retryContextCache;

  @Autowired
  RetryContext retryContext;

  public List<BookingInfo> process(String bookingId) throws Exception {

    List<BookingInfo> list = new ArrayList <> ();

    if (retryContextCache.containsKey(bookingId)) {
      list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
    } else {
      // fetch and populate list from database.
    }

    try {
      // do something with the list.
    } catch (Exception e) {
      // modify something in the list.
      retryContext.setAttribute(bookingId, list);
      retryContextCache.put(bookingId, retryContext);
      throw e;
    }

  }
}

You can see that I try to set some values in the retryContextCache before I re-throw the exception for the retry mechanism to work.

When retry happens, it goes inside the if condition mentioned in the above code. But, the value of retryContextCache.get(bookingId).getAttribute(bookingId) is always null.

Am I setting the value in retry context incorrectly? Why is this not working?


Solution

  • Here is how I resolved the issue.

    I wanted to save object state before the retry starts. The scope for RetryContext starts right after the error is thrown. So, I was not able to set the value in RetryContext.

    So, I created a bean with StepScope:

    @Bean
    @StepScope
    public Map<String,BookingInfo> objectStateMap() {
     return new ConcurrentHashMap<>();
    }
    

    And then, I autowired this hash wherever required.

    Then, I wrote the modified objects to this hash before the error is re-thrown in the catch block.

    Then, I created a SkipListener for my usecase. This will capture the skipped object in case of failure. And then, it will do their respective task accordingly.

    @Component
    @Slf4j
    public class CustomSkipListener {
    
      @Autowired
      Map<String, List<BookingInfo>> objectStateMap;
    
    
      @OnSkipInRead
      public void onSkipInRead(Throwable t) {
        log.error("Read has skipped because of error : " + t.getMessage());
      }
    
      @OnSkipInWrite
      public void onSkipInWrite(List<BookingInfo> item, Throwable t) {
        log.error("Write has skipped because of error : " + t.getMessage());
      }
    
      @OnSkipInProcess
      public void onSkipInProcess(String bookingId, Throwable t) {
        List<BookingInfo> bookingInfos = objectStateMap.get(bookingId);
        // do some tasks..
        objectStateMap.remove(bookingId);
      }
    
    }
    

    Just registered this listener into the main job.