Search code examples
springspring-batchjooqspring-transactionsspring-batch-tasklet

JOOQ and Spring Batch transaction management


I am currently working on Spring Batch (2.6.3) application and I am using JOOQ (3.14.15) to access MariaDB database (10.5.8). So far I got working few jobs and got to the point where I am testing transactions. I am trying to run tasklet which is to my understanding executed in transaction by default. I have made repository with simple insert which I annotated with @Transactional. I have added Thread.sleep and my "test case" is that I let Spring Batch execute my test tasklet and during sleep period I kill my application. I expected that there will be no record in T_FOO table since I expected rollback of transaction but there is record in my T_FOO table even though in BATCH_STEP_EXECUTION my test step (after application shutdown) has status of STARTED with END_TIME set as null - which I am guessing is representation that is expected when application gets shutdown while step is executing.

I am guessing that I did not setup correctly transaction management for Spring Batch and JOOQ or something along those lines? But it colud be wrong guess of course

Could someone please help me with this issue? I was trying to google how to properly setup JOOQ with Spring Batch but the only resource I could find was this one and I wrote jooq configuration inspired by that article and also I have tried copy paste configuration in article but it did not resolve my issue.

Thanks in advance for your time.


Tasklet test step

public Step testStep() {
        return this.stepBuilderFactory.get(TEST_STEP)
                .tasklet((stepContribution, chunkContext) -> {
                    final Integer batchId = JobUtils.getBatchId(chunkContext);
                    final LocalDateTime batchDate = JobUtils.getBatchDate(chunkContext);

                    importRepository.importBatch(batchId, batchDate);

                    /* Thread.sleep is here only for testing purposes - at this sleep I am shuting down application */
                    log.debug("Waiting start");
                    Thread.sleep(30000);
                    log.debug("Waiting end");

                    return RepeatStatus.FINISHED;
                }).listener(loadingProcessingErrorListener)
                .build();
    }

Transactional(propagation = Propagation.REQUIRED)
public void importBatch(@NonNull Integer batchId, @NonNull LocalDateTime batchDate) {
    /* simple insert select statement */
    context.insertInto(T_FOO)
        select(context.select(asterisk()).from(T_BAR))
        execute();
}

Batch configuration

@Configuration
@RequiredArgsConstructor
public class BatchConfiguration extends DefaultBatchConfigurer {

    private final JobRegistry jobRegistry;
    private final DataSource dataSource;

    @Override
    @NonNull
    @SneakyThrows
    public JobLauncher getJobLauncher() {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Override
    @NonNull
    public PlatformTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

JOOQ configuration

@Configuration
@RequiredArgsConstructor
public class JooqConfiguration {

    private final DataSource dataSource;

    @Bean
    public DataSourceConnectionProvider connectionProvider() {
        return new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource));
    }

    @Bean
    public DefaultDSLContext context() {
        return new DefaultDSLContext(configuration());
    }

    public DefaultConfiguration configuration() {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();

        jooqConfiguration.set(connectionProvider());

        jooqConfiguration.setDataSource(dataSource);
        jooqConfiguration.setSQLDialect(SQLDialect.MARIADB);
        jooqConfiguration.setSettings(new Settings().withExecuteWithOptimisticLocking(true));

        return jooqConfiguration;
    }
}

Console log

2022-05-28 21:26:18.571 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps       : Starting loading import for Batch ID [1]
2022-05-28 21:26:18.607  INFO 15656 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [testStep]
2022-05-28 21:26:18.861 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener            : Executing query          : insert into `t_foo`...
2022-05-28 21:26:18.875 DEBUG 15656 --- [cTaskExecutor-1] org.jooq.tools.LoggerListener            : Affected row(s)          : 1
2022-05-28 21:26:18.876 DEBUG 15656 --- [cTaskExecutor-1] s.o.p.c.p.a.j.i.ImportLoadingSteps       : Waiting start
2022-05-28 21:26:23.759  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.783  INFO 15656 --- [ionShutdownHook] o.s.s.quartz.SchedulerFactoryBean        : Shutting down Quartz Scheduler
2022-05-28 21:26:23.783  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED shutting down.
2022-05-28 21:26:23.783  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED paused.
2022-05-28 21:26:23.784  INFO 15656 --- [ionShutdownHook] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED shutdown complete.
2022-05-28 21:26:23.790  INFO 15656 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...

Process finished with exit code -1

Solution

  • You're doing this:

    jooqConfiguration.set(connectionProvider());
    

    But then, you're also needlessly doing this:

    jooqConfiguration.setDataSource(dataSource);
    

    The latter overrides the former, removing the TransactionAwareDataSourceProxy semantics, and has no purpose otherwise, either. Just remove that call.