Search code examples
javaspring-batchmybatischunks

Records missing due to Chunk count in SpringBatc


We have a batch job to load millions of Employee data with multiple address, It is failing to load few rows when I use chunk.

For example if I use chunk 5 and we loose 6th record which is associate to 5th row employee(refer image)

Please suggest a solution. Here is the Spring Batch code

    @Configuration
public class EmployeeJobMyBatis {


    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private EmployeeDataSourceConfig datasourceConfig;

    EmployeeRowMapper rowMapper = null;

    private static final Logger LOG = LogManager.getLogger(EmployeeJobMyBatis.class);

    @Bean
    @Qualifier("MyBatisJob")
    public Job mybatisJob() throws Exception {
        return this.jobBuilderFactory.get("MyBatisJob").incrementer(new RunIdIncrementer())
                .start(step()).build();
    }

    @Bean
    public Step step() throws SQLException, Exception {

        return this.stepBuilderFactory.get("EmployeeDataReadStep").<Employee, String>chunk(5)
                .reader(reader()).processor(processor()).writer(writer())
                .build();
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        PathMatchingResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
        SqlSessionFactoryBean ss = new SqlSessionFactoryBean();
        ss.setDataSource(datasourceConfig.getDataSource());
        ss.setMapperLocations(resourcePatternResolver.getResources("employee.xml"));
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setDefaultExecutorType(ExecutorType.BATCH);
        ss.setConfiguration(configuration);
        return ss.getObject();
    }

    @Bean
    public MyBatisCursorItemReader<Employee> reader() throws Exception {
        MyBatisCursorItemReader<Employee> reader = new MyBatisCursorItemReader<Employee>();
        reader.setSqlSessionFactory(sqlSessionFactory());
        reader.setQueryId("EmployeeData");
        return reader;

    }

    @Bean
    public processor processor() {
        return new DataProcessor();
    }

    @Bean
    public MultiResourceItemWriter<String> writer() {
        MultiResourceItemWriter<String> writer = new MultiResourceItemWriter<String>();
        writer.setResource(new FileSystemResource("C:/data/Employee.json"));
        writer.setItemCountLimitPerResource(2500000);
        FlatFileItemWriter<String> fileWriter = new FlatFileItemWriter<String>();
        fileWriter.setLineAggregator(new MyDelimitedLineAggregator());
        writer.setDelegate(fileWriter);
        return writer;
    }



}


 public class DataProcessor implements ItemProcessor<Employee, String> {


    private static final Gson gson = new GsonBuilder().create();

    @Override
    public String process(Employee employee) throws Exception {

        if (employee != null && employee.getId() == null)
            return null;
        else
            return (String) (gson.toJson(employee));
    }

}

public class MyDelimitedLineAggregator extends DelimitedLineAggregator<String> {

    String returnString = "";
    @Override
    public String aggregate(String jsonstr) {
        if(jsonstr != null)
            returnString = jsonstr;
        return returnString;
    }
}



public class Employee{

    String emplId;
    Addresses addressList;

    public String getEmplId() {
        return emplId;
    }

   public void setEmplId(Addresses value) {
        this.emplId = value;
   }


   public Addresses getAddressList() {
       return addressList;
   }

   public void setAddressList(Addresses value) {
       this.addressList = value;
   }
}
public class Addresses{

   List<Address> addresses;

   public List<Address> getAddresses() {
     if (addresses == null) {
        addresses = new ArrayList<Address>();
      }
     return this.addresses;
   }
 }
 public class Address{
   String addressLineOne;
   String city;
   String country;

   public String getAddressLineOne(){
       return addressLineOne;
   }

   public void setAddressLineOne(String value) {
       this.addressLineOne = value;
   }

   public String getCity(){
       return city;
   }

   public void setCity(String value) {
       this.city = value;
   }
   public String getCountry(){
       return country;
   }

   public void setCountry(String value) {
       this.country = value;
   }
}

Here is the MyBatis Mapper xml

Employee.xml

<resultMap id="EmployeeMap" type="Employee">
  <id column="emplId" property="emplId"/>
  <collection property="addressList.addresses"
      javaType="list" ofType="Address">
    <result column="addressLineOne" property="addressLineOne"/>
    <result column="city" property="city"/>
    <result column="country" property="country"/>
  </collection>
</resultMap>
<select id="employeeData" resultMap="EmployeeMap">select * from employee e left join address a on a.emplId = e.emplId</select>

Solution

  • A chunk oriented step reads rows one by one and map each one to a domain object (basically one to one mapping). In your case, you have a one to many relation. So your step configuration won't work as it is now. What you need to do is implement the driving query pattern as follows:

    • Make the reader reads employee details except addresses
    • Use an item processor that fetches addresses for the current employee

    Edit: Add an example

    import javax.sql.DataSource;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.H2)
                    .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                    .addScript("/org/springframework/batch/core/schema-h2.sql")
                    .build();
        }
    
        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }
    
        @Bean
        public JdbcCursorItemReader<Person> itemReader() {
            return new JdbcCursorItemReaderBuilder<Person>()
                    .name("personItemReader")
                    .dataSource(dataSource())
                    .sql("select id, name from person")
                    .beanRowMapper(Person.class)
                    .build();
        }
    
        @Bean
        public ItemProcessor<Person, Person> itemProcessor() {
            return new ItemProcessor<Person, Person>() {
                @Autowired
                private JdbcTemplate jdbcTemplate;
    
                @Override
                public Person process(Person person) {
                    Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
                    person.setAddress(address);
                    return person;
                }
            };
        }
    
        @Bean
        public ItemWriter<Person> itemWriter() {
            return items -> {
                for (Person item : items) {
                    System.out.println("item = " + item);
                }
            };
        }
    
        @Bean
        public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
            return jobs.get("job")
                    .start(steps.get("step")
                            .<Person, Person>chunk(2)
                            .reader(itemReader())
                            .processor(itemProcessor())
                            .writer(itemWriter())
                            .build())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
            jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
            jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
            jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
            jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
            jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
            jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
        public static class Person {
            private long id;
            private String name;
            private Address address;
    
            public Person() {
            }
    
            public Person(long id, String name) {
                this.id = id;
                this.name = name;
            }
    
            public long getId() {
                return id;
            }
    
            public void setId(long id) {
                this.id = id;
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public Address getAddress() {
                return address;
            }
    
            public void setAddress(Address address) {
                this.address = address;
            }
    
            @Override
            public String toString() {
                return "Person{" +
                        "id=" + id +
                        ", name='" + name + '\'' +
                        ", address=" + address +
                        '}';
            }
        }
    
        public static class Address {
            private int id;
            private int personId;
            private String street;
    
            public Address() {
            }
    
            public int getId() {
                return id;
            }
    
            public void setId(int id) {
                this.id = id;
            }
    
            public String getStreet() {
                return street;
            }
    
            public void setStreet(String street) {
                this.street = street;
            }
    
            public int getPersonId() {
                return personId;
            }
    
            public void setPersonId(int personId) {
                this.personId = personId;
            }
    
            @Override
            public String toString() {
                return "Address{" +
                        "id=" + id +
                        ", street='" + street + '\'' +
                        '}';
            }
        }
    
    }
    

    This example reads person/address data. The reader reads only the person's id and name, and a processor fetches the address for the current item.