Search code examples
springspring-batchcompositeitemwriter

How does Spring Batch CompositeItemWriter manage transaction for delegate writers?


In the batch job step configuration, I plan to execute 2 queries in the writer, the 1st query is to update records in table A, then the 2nd query is to insert new records in table A again.

So far I think CompositeItemWriter can achieve my goal above, i.e., I need to create 2 JdbcBatchItemWriters, one is for update, and the other one is for insert.

My first question is if CompositeItemWriter is a fit for the requirement above?

If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?

Thanks in advance!


Solution

  • My first question is if CompositeItemWriter is a fit for the requirement above?

    Yes, CompositeItemWriter is the way to go.

    If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?

    Excellent question! Yes, if the update succeeds in the first writer and then the insert fails in the second writer, all statements will be rolled back automatically. What you need to know is that the transaction is around the execution of the chunk oriented tasklet step (and so around the write method of the composite item writer). Hence, the execution of all sql statements within this method (executed in delegate writers) will be atomic.

    To illustrate this use case, I wrote the following test:

    • Given a table people with two columns id and name with only one record inside it: 1,'foo'
    • Let's imagine a job that reads two records (1,'foo', 2,'bar') and tries to update foo to foo!! and then inserts 2,'bar' in the table. This is done with a CompositeItemWriter with two item writers: UpdateItemWriter and InsertItemWriter
    • The use case is that UpdateItemWriter succeeds but InsertItemWriter fails (by throwing an exception)
    • The expected result is that foo is not updated to foo!! and bar is not inserted in the table (Both sql statements are rolled back due to the exception in the InsertItemWriter)

    Here is the code (it is self-contained so you can try it and see how things work, it uses an embedded hsqldb database which should be in your classpath):

    import java.util.Arrays;
    import java.util.List;
    import javax.sql.DataSource;
    
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.CompositeItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.test.JobLauncherTestUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.test.jdbc.JdbcTestUtils;
    
    @RunWith(SpringRunner.class)
    @ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
    public class TransactionWithCompositeWriterTest {
    
        @Autowired
        private JobLauncherTestUtils jobLauncherTestUtils;
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        @Before
        public void setUp() {
            jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
            jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
        }
    
        @Test
        public void testTransactionRollbackWithCompositeWriter() throws Exception {
            // given
            int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
            int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
            int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
            Assert.assertEquals(1, peopleCount);
            Assert.assertEquals(1, fooCount);
            Assert.assertEquals(0, barCount);
    
            // when
            JobExecution jobExecution = jobLauncherTestUtils.launchJob();
    
            // then
            Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
            Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
            StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
            Assert.assertEquals(0, stepExecution.getCommitCount());
            Assert.assertEquals(1, stepExecution.getRollbackCount());
            Assert.assertEquals(0, stepExecution.getWriteCount());
    
            peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
            fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
            barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
            Assert.assertEquals(1, peopleCount); // bar is not inserted
            Assert.assertEquals(0, barCount); // bar is not inserted
            Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
        }
    
        @Configuration
        @EnableBatchProcessing
        public static class JobConfiguration {
    
            @Bean
            public DataSource dataSource() {
                return new EmbeddedDatabaseBuilder()
                        .setType(EmbeddedDatabaseType.HSQL)
                        .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                        .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                        .build();
            }
    
            @Bean
            public JdbcTemplate jdbcTemplate(DataSource dataSource) {
                return new JdbcTemplate(dataSource);
            }
    
            @Bean
            public ItemReader<Person> itemReader() {
                Person foo = new Person(1, "foo");
                Person bar = new Person(2, "bar");
                return new ListItemReader<>(Arrays.asList(foo, bar));
            }
    
            @Bean
            public ItemWriter<Person> updateItemWriter() {
                return new UpdateItemWriter(dataSource());
            }
    
            @Bean
            public ItemWriter<Person> insertItemWriter() {
                return new InsertItemWriter(dataSource());
            }
    
            @Bean
            public ItemWriter<Person> itemWriter() {
                CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
                compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
                return compositeItemWriter;
            }
    
            @Bean
            public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
                return jobBuilderFactory.get("job")
                        .start(stepBuilderFactory
                                .get("step").<Person, Person>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build())
                        .build();
            }
    
            @Bean
            public JobLauncherTestUtils jobLauncherTestUtils() {
                return new JobLauncherTestUtils();
            }
        }
    
        public static class UpdateItemWriter implements ItemWriter<Person> {
    
            private JdbcTemplate jdbcTemplate;
    
            public UpdateItemWriter(DataSource dataSource) {
                this.jdbcTemplate = new JdbcTemplate(dataSource);
            }
    
            @Override
            public void write(List<? extends Person> items) {
                for (Person person : items) {
                    if ("foo".equalsIgnoreCase(person.getName())) {
                        jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
                    }
                }
            }
        }
    
        public static class InsertItemWriter implements ItemWriter<Person> {
    
            private JdbcTemplate jdbcTemplate;
    
            public InsertItemWriter(DataSource dataSource) {
                this.jdbcTemplate = new JdbcTemplate(dataSource);
            }
    
            @Override
            public void write(List<? extends Person> items) {
                for (Person person : items) {
                    if ("bar".equalsIgnoreCase(person.getName())) {
                        jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
                        throw new IllegalStateException("Something went wrong!");
                    }
                }
            }
        }
    
        public static class Person {
    
            private long id;
    
            private String name;
    
            public Person() {
            }
    
            public Person(long id, String name) {
                this.id = id;
                this.name = name;
            }
    
            public long getId() {
                return id;
            }
    
            public void setId(long id) {
                this.id = id;
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
        }
    }
    

    My example uses custom item writers but this should work with two JdbcBatchItemWriters as well.

    I hope this helps!