I am using spring batch and I'm trying to do a POC to skip the entries for which I get an exception. To do this, I have written a SkipListener with some logic to populate an external table with the failed records.
package in.novopay.accounting.batch.listeners;
import in.novopay.accounting.account.loans.entity.LoanAccountEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.util.Date;
@Component
@StepScope
public class LoanAccountDpdSkipListener implements SkipListener<Object[], LoanAccountEntity> {
private static final Logger LOGGER = LoggerFactory.getLogger(LoanAccountDpdSkipListener.class);
private JdbcTemplate jdbcTemplate;
private static final String jobName = "loanAccountDpdCalcJob";
private static final String groupCode = "LMS-EOD-BOD";
@Autowired
public LoanAccountDpdSkipListener(DataSource dataSource){
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void onSkipInWrite(LoanAccountEntity loanAccountEntity, Throwable t){
LOGGER.info("***Skipping item***: Loan Account Number: {}", loanAccountEntity.getAccountNumber());
String sql = "INSERT INTO batch_failure_audit (job_name, execution_date, context_type, context_value, failure_message, failure_stack_trace, group_code) VALUES (?, ?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(sql, jobName, new Date(), LoanAccountEntity.class, loanAccountEntity.getAccountNumber(),
t.getMessage(), t.getStackTrace(), groupCode);
}
@Override
public void onSkipInProcess(Object[] object, Throwable t){
Long accountId = (Long) object[0];
LOGGER.info("***Skipping item***: Loan Account ID: {}", accountId);
String sql = "INSERT INTO batch_failure_audit (job_name, execution_date, context_type, context_value, failure_message, failure_stack_trace, group_code) VALUES (?, ?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(sql, jobName, new Date(), LoanAccountEntity.class, accountId.toString(),
t.getMessage(), t.getStackTrace(), groupCode);
}
}
My Item Reader
public class LoanAccountDpdCalcItemReader extends JdbcPagingItemReader<Object[]> {
private static final Logger LOGGER = LoggerFactory.getLogger(LoanAccountDpdCalcItemReader.class);
public LoanAccountDpdCalcItemReader(DataSource dataSource , Map<String, Object> jobParameters, Long minValue , Long maxValue){
super();
setDataSource(dataSource);
String querySelectClause=" SELECT la.account_id, la.loan_product_id, la.past_due_days, a.account_number, a.currency, " +
" acs.criteria, acs.id, la.expected_disbursement_date, lp.product_id, la.loan_status, la.npa_ageing_start_date , a.office_id ";
String queryFromClause = " FROM loan_account la JOIN account a ON a.id = la.account_id " +
" JOIN asset_criteria_slabs acs ON acs.id = la.asset_criteria_slabs_id " +
" JOIN loan_product lp ON lp.id = la.loan_product_id " +
" JOIN product_scheme ps ON ps.id = a.product_scheme_id ";
String queryWhereClause=" WHERE la.loan_status IN ('ACTIVE','FORECLOSURE_FREEZE') " +
" and la.account_id >="+minValue.intValue()+" and la.account_id <="+maxValue.intValue();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
sqlPagingQueryProviderFactoryBean.setSelectClause(querySelectClause);
sqlPagingQueryProviderFactoryBean.setFromClause(queryFromClause);
sqlPagingQueryProviderFactoryBean.setWhereClause(queryWhereClause);
sqlPagingQueryProviderFactoryBean.setSortKey("account_id");
try {
PagingQueryProvider pagingQueryProvider= sqlPagingQueryProviderFactoryBean.getObject();
if(pagingQueryProvider!=null){
setQueryProvider(pagingQueryProvider);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
setFetchSize(0);
int chunk = getChunk(jobParameters);
setPageSize(chunk);
setRowMapper(new ResultMapper());
}
private int getChunk(Map<String, Object> jobParameters) {
int chunk =Constants.WORKER_CHUNK_SIZE;
if(jobParameters.containsKey(Constants.CHUNK_KEY)){
Object chunkParam =jobParameters.get(Constants.CHUNK_KEY);
if(chunkParam instanceof String chunkParamStr){
chunk = Integer.parseInt(chunkParamStr);
}else{
chunk = (Integer) chunkParam;
}
}
return chunk;
}
}
My Item Processor
@Component
@StepScope
public class LoanAccountDpdCalcItemProcessor implements ItemProcessor<Object[], LoanAccountEntity> {
@Autowired
LoanAccountDpdCalcBatchProcessor loanAccountDpdCalcBatchProcessor;
@Value("#{jobParameters['job_time']}")
private String jobTime;
@Override
public LoanAccountEntity process(Object[] objects) throws Exception {
if (StringUtils.isBlank(jobTime)) {
return null;
}
Calendar nowCal = Calendar.getInstance();
nowCal.setTimeInMillis(Long.parseLong(jobTime));
return loanAccountDpdCalcBatchProcessor.processIndividualActiveAccount(objects, nowCal.getTime());
}
}
My Item Writer
@Component
@StepScope
public class LoanAccountDpdCalcItemWriter extends JpaItemWriter<LoanAccountEntity> {
@Autowired
private LoanAccountDAOService loanAccountDAOService;
LoanAccountDpdCalcItemWriter(EntityManagerFactory entityManagerFactory){
super();
setEntityManagerFactory(entityManagerFactory);
}
@Override
public void write(Chunk<? extends LoanAccountEntity> chunk) {
List<? extends LoanAccountEntity> list = chunk.getItems();
List<LoanAccountEntity> entities = (List<LoanAccountEntity>) list;
loanAccountDAOService.saveAll(entities);
}
}
Building the step:
buildStep(ItemReader<I> itemReader,
ItemProcessor<? super I, ? extends O> processor,
ItemWriter<? super O> itemWriter, SkipListener<? super I, ? extends O> skipListener)
return workerStepBuilderFactory.get(stepName)
.repository(jobRepository)
.jobExplorer(jobExplorer)
.inputChannel(requestForWrks)
.outputChannel(repliesFromWrks)
.<I, O>chunk(chunkSize,transactionManager)
.reader(itemReader)
.writer(itemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(Integer.MAX_VALUE)
.listener(skipListener);
Now, when an exception is thrown in the itemprocessor(I throw it explicitly to test my setup), I get:
[2024-04-17 23:18:15.974] [] [accounting] [ThreadPoolEx-3] [5a6ab4f3c19dbe7c] [1b94098a260b50d75cdad626e7cf9521] [] [] [ERROR] [o.s.b.c.s.AbstractStep] Encountered an error executing step loanAccountDpdCalcJobsStep1 in job loanAccountDpdCalcJob
org.springframework.batch.core.step.skip.SkipListenerFailedException: Fatal exception in SkipListener.
java.lang.RuntimeException: Manually triggering exception to test skipping logic
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:450) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:429) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:227) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:388) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:312) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.1.4.jar!/:6.1.4]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:255) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:261) ~[spring-batch-infrastructure-5.1.1.jar!/:5.1.1]
at in.novopay.infra.batch.config.TenantAwareTaskDecorator.lambda$decorate$0(TenantAwareTaskDecorator.java:19) ~[infra-batch-0.0.4.RELEASE-plain.jar!/:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.springframework.aop.AopInvocationException: AOP configuration seems to be invalid: tried calling method [public void in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener.onSkipInWrite(in.novopay.accounting.account.loans.entity.LoanAccountEntity,java.lang.Throwable)] on target [in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener@7af736]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717) ~[spring-aop-6.1.4.jar!/:6.1.4]
at in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener$$SpringCGLIB$$0.onSkipInWrite(<generated>) ~[!/:?]
at org.springframework.batch.core.listener.CompositeSkipListener.onSkipInWrite(CompositeSkipListener.java:72) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.listener.MulticasterBatchListener.onSkipInWrite(MulticasterBatchListener.java:297) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:447) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
... 13 more
Caused by: java.lang.IllegalArgumentException: argument type mismatch
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717) ~[spring-aop-6.1.4.jar!/:6.1.4]
at in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener$$SpringCGLIB$$0.onSkipInWrite(<generated>) ~[!/:?]
at org.springframework.batch.core.listener.CompositeSkipListener.onSkipInWrite(CompositeSkipListener.java:72) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.listener.MulticasterBatchListener.onSkipInWrite(MulticasterBatchListener.java:297) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:447) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
... 13 more
I couldn't find anything relevant online, I'm new to AOP so the error is making little sense to me as I have checked the method signatures thoroughly.
I have also built a small sample batch project where this skipping logic seems to be working just fine.
The reasons for this error can be many. But in my case there were two versions of the spring-aop library in the classpath. One in the main application and another one sneaking in from one of our internal helper libraries. Fixing that got rid of the error above.