Search code examples
javaspringspring-bootspring-batch

Spring Batch issue with MultiResourceItemWriter and ClassifierCompositeItemWriter


I am using Spring Boot + Batch v2.7.1 in my project and looks like there is a bug when Reading from FlatFileItemReader using ClassifierCompositeItemWriter and MultiResourceItemWriter as itemCountLimitPerResource value doesn't works well and gives wrong responses.

I am reading csv file and splitting into multiple files having max records in every file should be 5 only, but the code which I developed giving me 7 values.

MainApp.java

@EnableBatchProcessing
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MultiResourceSplitApplication {

    public static void main(String[] args) {
        SpringApplication.run(MultiResourceSplitApplication.class, args);
    }
}

MyJob.java

package com.example;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemWriterBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.builder.ClassifierCompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.classify.Classifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class MyJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public FlatFileItemReader<Employee> itemReader() {
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("empId", "firstName", "lastName", "role");

        DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
        employeeLineMapper.setLineTokenizer(tokenizer);
        employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        employeeLineMapper.afterPropertiesSet();

        return new FlatFileItemReaderBuilder<Employee>()
                .name("flatFileReader")
                .linesToSkip(1)
                .resource(new ClassPathResource("employee.csv"))
                .lineMapper(employeeLineMapper)
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
        Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
                javaDeveloperItemWriter(), 
                pythonDeveloperItemWriter(), 
                cloudDeveloperItemWriter());
        
        return new ClassifierCompositeItemWriterBuilder<Employee>()
                .classifier(classifier)
                .build();
    }

    @Bean
    public ItemWriter<Employee> javaDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw1")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("javaDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("javaDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> pythonDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw2")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("pythonDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("pythonDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> cloudDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw3")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("cloudDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("cloudDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<Employee, Employee>chunk(3)
                .reader(itemReader())
                .writer(classifierCompositeItemWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .start(step())
                .build();
    }
}

Classifier.java

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

import lombok.Setter;


@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
    private static final long serialVersionUID = 1L;
    private ItemWriter<Employee> javaDeveloperFileItemWriter;
    private ItemWriter<Employee> pythonDeveloperFileItemWriter;
    private ItemWriter<Employee> cloudDeveloperFileItemWriter;
    
    public EmployeeClassifier() {
        
    }

    public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
                              ItemWriter<Employee> pythonDeveloperFileItemWriter,
                              ItemWriter<Employee> cloudDeveloperFileItemWriter) {
        this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
        this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
        this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
    }

    @Override
    public ItemWriter<? super Employee> classify(Employee employee) {
        if(employee.getRole().equals("Java Developer")){
            return javaDeveloperFileItemWriter;
        }
        else if(employee.getRole().equals("Python Developer")){
            return pythonDeveloperFileItemWriter;
        }
        return cloudDeveloperFileItemWriter;
    }
}

Employee.java

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
    private String empId;
    private String firstName;
    private String lastName;
    private String role;

    @Override
    public String toString() {
        return empId + "," + firstName + "," + lastName + "," + role;
    }
}

EmployeeFieldSetMapper.java

public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
    @Override
    public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
        return Employee.builder()
                .empId(fieldSet.readRawString("empId"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .role(fieldSet.readRawString("role"))
                .build();
    }
}

employee.csv

empId,firstName,lastName,role
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
10,Ravi,Doe,Python Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

Output: javaDeveloper-employee.csv-1

1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer

javaDeveloper-employee.csv-2

16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

pythonDeveloper-employee.csv-1

3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer

pythonDeveloper-employee.csv-1

10,Ravi,Doe,Python Developer

Solution

  • Finally, found the issue.

    Root cause: Usage of both .<Employee, Employee>chunk(3) & .itemCountLimitPerResource(5) were not working together to give the appropriate and expected records in your case.

    That's why the POC was not working.


    Note: You don't need to have .<Employee, Employee>chunk(3) as you are using .itemCountLimitPerResource(5) . itemCountLimitPerResource method will take care of creating the chunk and put at most 5 records in every file.

    Fix/Solution: Make .<Employee, Employee>chunk(0) in the MyJobConfig class.

    Updated MyJobConfig:

    @Configuration
    public class MyJobConfig {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public FlatFileItemReader<Employee> itemReader() {
            DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
            tokenizer.setNames("empId", "firstName", "lastName", "role");
    
            DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
            employeeLineMapper.setLineTokenizer(tokenizer);
            employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
            employeeLineMapper.afterPropertiesSet();
    
            return new FlatFileItemReaderBuilder<Employee>()
                    .name("flatFileReader")
                    .linesToSkip(1)
                    .resource(new ClassPathResource("employee.csv"))
                    .lineMapper(employeeLineMapper)
                    .build();
        }
    
        @Bean
        public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
            Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
                    javaDeveloperItemWriter(),
                    pythonDeveloperItemWriter(),
                    cloudDeveloperItemWriter());
    
            return new ClassifierCompositeItemWriterBuilder<Employee>()
                    .classifier(classifier)
                    .build();
        }
    
        @Bean
        public ItemWriter<Employee> javaDeveloperItemWriter() {
            FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                    .lineAggregator(new PassThroughLineAggregator<>())
                    .name("iw1")
                    .build();
    
            return new MultiResourceItemWriterBuilder<Employee>()
                    .name("javaDeveloperItemWriter")
                    .delegate(itemWriter)
                    .resource(new FileSystemResource("/users/anisb/bounty-folder/javaDeveloper-employee.csv"))
                    .itemCountLimitPerResource(5)
                    .resourceSuffixCreator(index -> "-" + index)
                    .build();
        }
    
        @Bean
        public ItemWriter<Employee> pythonDeveloperItemWriter() {
            FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                    .lineAggregator(new PassThroughLineAggregator<>())
                    .name("iw2")
                    .build();
    
            return new MultiResourceItemWriterBuilder<Employee>()
                    .name("pythonDeveloperItemWriter")
                    .delegate(itemWriter)
                    .resource(new FileSystemResource("/users/anisb/bounty-folder/pythonDeveloper-employee.csv"))
                    .itemCountLimitPerResource(5)
                    .resourceSuffixCreator(index -> "-" + index)
                    .build();
        }
    
        @Bean
        public ItemWriter<Employee> cloudDeveloperItemWriter() {
            FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                    .lineAggregator(new PassThroughLineAggregator<>())
                    .name("iw3")
                    .build();
    
            return new MultiResourceItemWriterBuilder<Employee>()
                    .name("cloudDeveloperItemWriter")
                    .delegate(itemWriter)
                    .resource(new FileSystemResource("/users/anisb/bounty-folder/cloudDeveloper-employee.csv"))
                    .itemCountLimitPerResource(5)
                    .resourceSuffixCreator(index -> "-" + index)
                    .build();
        }
    
        @Bean
        public Step step() throws Exception {
            return stepBuilderFactory.get("step")
                    .<Employee, Employee>chunk(0)
                    .reader(itemReader())
                    .writer(classifierCompositeItemWriter())
                    .build();
        }
    
        @Bean
        public Job job() throws Exception {
            return jobBuilderFactory.get("job")
                    .start(step())
                    .build();
        }
    }
    

    Tested on my local:

    enter image description here

    javaDeveloper-employee.csv-1

    1,Mike ,Doe,Java Developer
    2,Matt ,Doe,Java Developer
    11,Gagan,Doe,Java Developer
    12,Ashish ,Doe,Java Developer
    13,Rajesh,Doe,Java Developer
    

    javaDeveloper-employee.csv-2:

    14,Anosh ,Doe,Java Developer
    15,Arpit ,Doe,Java Developer
    16,Sneha ,Doe,Java Developer
    17,Sneha ,Doe,Java Developer
    

    pythonDeveloper-employee.csv-1:

    3,Deepak ,Doe,Python Developer
    4,Neha ,Doe,Python Developer
    5,Harish,Doe,Python Developer
    6,Parag ,Doe,Python Developer
    7,Harshita ,Doe,Python Developer
    

    pythonDeveloper-employee.csv-2:

    8,Pranali ,Doe,Python Developer
    9,Raj ,Doe,Python Developer
    10,Ravi,Doe,Python Developer