Search code examples
javaspring-bootspring-batchspring-aopcglib

AOP configuration seems to be invalid - Spring batch (Skip Listener)


I am using spring batch and I'm trying to do a POC to skip the entries for which I get an exception. To do this, I have written a SkipListener with some logic to populate an external table with the failed records.

package in.novopay.accounting.batch.listeners;

import in.novopay.accounting.account.loans.entity.LoanAccountEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.Date;

@Component
@StepScope
public class LoanAccountDpdSkipListener implements SkipListener<Object[], LoanAccountEntity> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoanAccountDpdSkipListener.class);
    private JdbcTemplate jdbcTemplate;
    private static final String jobName = "loanAccountDpdCalcJob";
    private static final String groupCode = "LMS-EOD-BOD";

    @Autowired
    public LoanAccountDpdSkipListener(DataSource dataSource){
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void onSkipInWrite(LoanAccountEntity loanAccountEntity, Throwable t){
        LOGGER.info("***Skipping item***: Loan Account Number: {}", loanAccountEntity.getAccountNumber());
        String sql = "INSERT INTO batch_failure_audit (job_name, execution_date, context_type, context_value, failure_message, failure_stack_trace, group_code) VALUES (?, ?, ?, ?, ?, ?, ?)";

        jdbcTemplate.update(sql, jobName, new Date(), LoanAccountEntity.class, loanAccountEntity.getAccountNumber(),
                t.getMessage(), t.getStackTrace(), groupCode);

    }

    @Override
    public void onSkipInProcess(Object[] object, Throwable t){
        Long accountId = (Long) object[0];
        LOGGER.info("***Skipping item***: Loan Account ID: {}", accountId);
        String sql = "INSERT INTO batch_failure_audit (job_name, execution_date, context_type, context_value, failure_message, failure_stack_trace, group_code) VALUES (?, ?, ?, ?, ?, ?, ?)";

        jdbcTemplate.update(sql, jobName, new Date(), LoanAccountEntity.class, accountId.toString(),
                t.getMessage(), t.getStackTrace(), groupCode);

    }


}

My Item Reader

public class LoanAccountDpdCalcItemReader extends JdbcPagingItemReader<Object[]> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoanAccountDpdCalcItemReader.class);

    public LoanAccountDpdCalcItemReader(DataSource dataSource , Map<String, Object> jobParameters, Long minValue , Long maxValue){
        super();
        setDataSource(dataSource);

        String querySelectClause=" SELECT la.account_id, la.loan_product_id, la.past_due_days, a.account_number, a.currency, " +
                " acs.criteria, acs.id, la.expected_disbursement_date, lp.product_id, la.loan_status, la.npa_ageing_start_date , a.office_id  ";
        String queryFromClause = "  FROM loan_account la JOIN account a ON a.id = la.account_id " +
                " JOIN asset_criteria_slabs acs ON acs.id = la.asset_criteria_slabs_id " +
                " JOIN loan_product lp ON lp.id = la.loan_product_id " +
                " JOIN product_scheme ps ON ps.id = a.product_scheme_id  ";

        String queryWhereClause=" WHERE la.loan_status IN ('ACTIVE','FORECLOSURE_FREEZE') " +
                " and la.account_id >="+minValue.intValue()+" and la.account_id <="+maxValue.intValue();


        final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
        sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
        sqlPagingQueryProviderFactoryBean.setSelectClause(querySelectClause);
        sqlPagingQueryProviderFactoryBean.setFromClause(queryFromClause);
        sqlPagingQueryProviderFactoryBean.setWhereClause(queryWhereClause);
        sqlPagingQueryProviderFactoryBean.setSortKey("account_id");
        try {
            PagingQueryProvider pagingQueryProvider= sqlPagingQueryProviderFactoryBean.getObject();
            if(pagingQueryProvider!=null){
                setQueryProvider(pagingQueryProvider);
            }

        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        setFetchSize(0);
        int chunk = getChunk(jobParameters);
        setPageSize(chunk);
        setRowMapper(new ResultMapper());
    }

    private  int getChunk(Map<String, Object> jobParameters) {
        int chunk =Constants.WORKER_CHUNK_SIZE;
        if(jobParameters.containsKey(Constants.CHUNK_KEY)){
            Object chunkParam =jobParameters.get(Constants.CHUNK_KEY);
            if(chunkParam instanceof String chunkParamStr){
                chunk  = Integer.parseInt(chunkParamStr);
            }else{
                chunk = (Integer) chunkParam;
            }

        }
        return chunk;
    }


}

My Item Processor

@Component
@StepScope
public class LoanAccountDpdCalcItemProcessor implements ItemProcessor<Object[], LoanAccountEntity> {

    @Autowired
    LoanAccountDpdCalcBatchProcessor loanAccountDpdCalcBatchProcessor;

    @Value("#{jobParameters['job_time']}")
    private String jobTime;

    @Override
    public LoanAccountEntity process(Object[] objects) throws Exception {

        if (StringUtils.isBlank(jobTime)) {
            return null;
        }
        Calendar nowCal = Calendar.getInstance();
        nowCal.setTimeInMillis(Long.parseLong(jobTime));

        return loanAccountDpdCalcBatchProcessor.processIndividualActiveAccount(objects, nowCal.getTime());
    }
}

My Item Writer

@Component
@StepScope
public class LoanAccountDpdCalcItemWriter extends JpaItemWriter<LoanAccountEntity> {
    @Autowired
    private LoanAccountDAOService loanAccountDAOService;

    LoanAccountDpdCalcItemWriter(EntityManagerFactory entityManagerFactory){
        super();
        setEntityManagerFactory(entityManagerFactory);
    }

    @Override
    public void write(Chunk<? extends LoanAccountEntity> chunk) {
        List<? extends LoanAccountEntity> list = chunk.getItems();
        List<LoanAccountEntity> entities = (List<LoanAccountEntity>) list;
        loanAccountDAOService.saveAll(entities);
    }
}

Building the step:

buildStep(ItemReader<I> itemReader,
                                  ItemProcessor<? super I, ? extends O> processor,
                                  ItemWriter<? super O> itemWriter, SkipListener<? super I, ? extends O> skipListener)
return workerStepBuilderFactory.get(stepName)
                    .repository(jobRepository)
                    .jobExplorer(jobExplorer)
                    .inputChannel(requestForWrks)
                    .outputChannel(repliesFromWrks)
                    .<I, O>chunk(chunkSize,transactionManager)
                    .reader(itemReader)
                    .writer(itemWriter)
                     .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(Integer.MAX_VALUE)
                        .listener(skipListener);

Now, when an exception is thrown in the itemprocessor(I throw it explicitly to test my setup), I get:

[2024-04-17 23:18:15.974] [] [accounting] [ThreadPoolEx-3] [5a6ab4f3c19dbe7c] [1b94098a260b50d75cdad626e7cf9521] [] [] [ERROR] [o.s.b.c.s.AbstractStep] Encountered an error executing step loanAccountDpdCalcJobsStep1 in job loanAccountDpdCalcJob
org.springframework.batch.core.step.skip.SkipListenerFailedException: Fatal exception in SkipListener.
java.lang.RuntimeException: Manually triggering exception to test skipping logic
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:450) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:429) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:227) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:388) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:312) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.1.4.jar!/:6.1.4]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:255) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:261) ~[spring-batch-infrastructure-5.1.1.jar!/:5.1.1]
    at in.novopay.infra.batch.config.TenantAwareTaskDecorator.lambda$decorate$0(TenantAwareTaskDecorator.java:19) ~[infra-batch-0.0.4.RELEASE-plain.jar!/:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.springframework.aop.AopInvocationException: AOP configuration seems to be invalid: tried calling method [public void in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener.onSkipInWrite(in.novopay.accounting.account.loans.entity.LoanAccountEntity,java.lang.Throwable)] on target [in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener@7af736]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener$$SpringCGLIB$$0.onSkipInWrite(<generated>) ~[!/:?]
    at org.springframework.batch.core.listener.CompositeSkipListener.onSkipInWrite(CompositeSkipListener.java:72) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.listener.MulticasterBatchListener.onSkipInWrite(MulticasterBatchListener.java:297) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:447) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    ... 13 more
Caused by: java.lang.IllegalArgumentException: argument type mismatch
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener$$SpringCGLIB$$0.onSkipInWrite(<generated>) ~[!/:?]
    at org.springframework.batch.core.listener.CompositeSkipListener.onSkipInWrite(CompositeSkipListener.java:72) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.listener.MulticasterBatchListener.onSkipInWrite(MulticasterBatchListener.java:297) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:447) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    ... 13 more

I couldn't find anything relevant online, I'm new to AOP so the error is making little sense to me as I have checked the method signatures thoroughly.

I have also built a small sample batch project where this skipping logic seems to be working just fine.


Solution

  • The reasons for this error can be many. But in my case there were two versions of the spring-aop library in the classpath. One in the main application and another one sneaking in from one of our internal helper libraries. Fixing that got rid of the error above.