We have a batch job to load millions of Employee data with multiple address, It is failing to load few rows when I use chunk.
Please suggest a solution. Here is the Spring Batch code
@Configuration
public class EmployeeJobMyBatis {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private EmployeeDataSourceConfig datasourceConfig;
EmployeeRowMapper rowMapper = null;
private static final Logger LOG = LogManager.getLogger(EmployeeJobMyBatis.class);
@Bean
@Qualifier("MyBatisJob")
public Job mybatisJob() throws Exception {
return this.jobBuilderFactory.get("MyBatisJob").incrementer(new RunIdIncrementer())
.start(step()).build();
}
@Bean
public Step step() throws SQLException, Exception {
return this.stepBuilderFactory.get("EmployeeDataReadStep").<Employee, String>chunk(5)
.reader(reader()).processor(processor()).writer(writer())
.build();
}
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
PathMatchingResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
SqlSessionFactoryBean ss = new SqlSessionFactoryBean();
ss.setDataSource(datasourceConfig.getDataSource());
ss.setMapperLocations(resourcePatternResolver.getResources("employee.xml"));
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setDefaultExecutorType(ExecutorType.BATCH);
ss.setConfiguration(configuration);
return ss.getObject();
}
@Bean
public MyBatisCursorItemReader<Employee> reader() throws Exception {
MyBatisCursorItemReader<Employee> reader = new MyBatisCursorItemReader<Employee>();
reader.setSqlSessionFactory(sqlSessionFactory());
reader.setQueryId("EmployeeData");
return reader;
}
@Bean
public processor processor() {
return new DataProcessor();
}
@Bean
public MultiResourceItemWriter<String> writer() {
MultiResourceItemWriter<String> writer = new MultiResourceItemWriter<String>();
writer.setResource(new FileSystemResource("C:/data/Employee.json"));
writer.setItemCountLimitPerResource(2500000);
FlatFileItemWriter<String> fileWriter = new FlatFileItemWriter<String>();
fileWriter.setLineAggregator(new MyDelimitedLineAggregator());
writer.setDelegate(fileWriter);
return writer;
}
}
public class DataProcessor implements ItemProcessor<Employee, String> {
private static final Gson gson = new GsonBuilder().create();
@Override
public String process(Employee employee) throws Exception {
if (employee != null && employee.getId() == null)
return null;
else
return (String) (gson.toJson(employee));
}
}
public class MyDelimitedLineAggregator extends DelimitedLineAggregator<String> {
String returnString = "";
@Override
public String aggregate(String jsonstr) {
if(jsonstr != null)
returnString = jsonstr;
return returnString;
}
}
public class Employee{
String emplId;
Addresses addressList;
public String getEmplId() {
return emplId;
}
public void setEmplId(Addresses value) {
this.emplId = value;
}
public Addresses getAddressList() {
return addressList;
}
public void setAddressList(Addresses value) {
this.addressList = value;
}
}
public class Addresses{
List<Address> addresses;
public List<Address> getAddresses() {
if (addresses == null) {
addresses = new ArrayList<Address>();
}
return this.addresses;
}
}
public class Address{
String addressLineOne;
String city;
String country;
public String getAddressLineOne(){
return addressLineOne;
}
public void setAddressLineOne(String value) {
this.addressLineOne = value;
}
public String getCity(){
return city;
}
public void setCity(String value) {
this.city = value;
}
public String getCountry(){
return country;
}
public void setCountry(String value) {
this.country = value;
}
}
Here is the MyBatis Mapper xml
Employee.xml
<resultMap id="EmployeeMap" type="Employee">
<id column="emplId" property="emplId"/>
<collection property="addressList.addresses"
javaType="list" ofType="Address">
<result column="addressLineOne" property="addressLineOne"/>
<result column="city" property="city"/>
<result column="country" property="country"/>
</collection>
</resultMap>
<select id="employeeData" resultMap="EmployeeMap">select * from employee e left join address a on a.emplId = e.emplId</select>
A chunk oriented step reads rows one by one and map each one to a domain object (basically one to one mapping). In your case, you have a one to many relation. So your step configuration won't work as it is now. What you need to do is implement the driving query pattern as follows:
Edit: Add an example
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person")
.beanRowMapper(Person.class)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return new ItemProcessor<Person, Person>() {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public Person process(Person person) {
Address address = jdbcTemplate.queryForObject("select * from address where personId = ?", new Object[]{person.getId()}, new BeanPropertyRowMapper<>(Address.class));
person.setAddress(address);
return person;
}
};
}
@Bean
public ItemWriter<Person> itemWriter() {
return items -> {
for (Person item : items) {
System.out.println("item = " + item);
}
};
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("job")
.start(steps.get("step")
.<Person, Person>chunk(2)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.update("CREATE TABLE address (id INT IDENTITY NOT NULL PRIMARY KEY, personId INT, street VARCHAR(20));");
jdbcTemplate.update("CREATE TABLE person (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (1,1, 'oxford street');");
jdbcTemplate.update("INSERT INTO address (id, personId, street) VALUES (2,2, 'howard street');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (1, 'foo');");
jdbcTemplate.update("INSERT INTO person (id, name) VALUES (2, 'bar');");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class Person {
private long id;
private String name;
private Address address;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
", address=" + address +
'}';
}
}
public static class Address {
private int id;
private int personId;
private String street;
public Address() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getStreet() {
return street;
}
public void setStreet(String street) {
this.street = street;
}
public int getPersonId() {
return personId;
}
public void setPersonId(int personId) {
this.personId = personId;
}
@Override
public String toString() {
return "Address{" +
"id=" + id +
", street='" + street + '\'' +
'}';
}
}
}
This example reads person/address data. The reader reads only the person's id
and name
, and a processor fetches the address for the current item.