Search code examples
javaspringbatch-processingspring-batch

Spring Batch how to process list of data before write in a Step


I am trying to read client data from database and write processed data to a flat file. But I need to process whole result of the ItemReader before write data.

For example, I am reading Client from database rows :

public class Client {
    private String id;
    private String subscriptionCode;
    private Boolean activated;
}

But I want to count and write how many user are activated grouped by subscriptionCode :

public class Subscription {
    private String subscriptionCode;
    private Integer activatedUserCount;
}

I don't know how to perform that using ItemReader/ItemProcessor/ItemWriter, can you help me ?

BatchConfiguration :

@CommonsLog
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Client, Client> chunk(1000)
                .reader(new ListItemReader<Client>(new ArrayList<Client>() { // Just for test
                    {
                        add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
                        add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
                        add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
                        add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
                    }
                }))
                .processor(new ItemProcessor<Client, Client>() {
                    public Client process(Client item) throws Exception {
                        log.info(item);
                        return item;
                    }
                })
                .writer(new ItemWriter<Client>() {
                    public void write(List<? extends Client> items) throws Exception {
                        // Only here I can use List of Client
                        // How can I process this list before to fill Subscription objects ?
                    }
                })
                .build();
    }

    @Bean
    public Job job1(Step step1) throws Exception {
        return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
    }
}

Main application:

public class App {
    public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
        System.exit(SpringApplication.exit(SpringApplication.run(BatchConfiguration.class, args)));
    }
}

Solution

  • I found a solution based on ItemProcessor :

    @Bean
    public Step step1() {
      return stepBuilderFactory.get("step1")
          .<Client, Subscription> chunk(1000)
          .reader(new ListItemReader<Client>(new ArrayList<Client>() {
            {
              add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
              add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
              add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
              add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
            }
          }))
          .processor(new ItemProcessor<Client, Subscription>() {
            private List<Subscription> subscriptions;
    
            public Subscription process(Client item) throws Exception {
              for (Subscription s : subscriptions) { // try to retrieve existing element
                if (s.getSubscriptionCode().equals(item.getSubscriptionCode())) { // element found
                  if(item.getActivated()) {
                    s.getActivatedUserCount().incrementAndGet(); // increment user count
                    log.info("Incremented subscription : " + s);
                  }                             
                  return null; // existing element -> skip
                }
              }
              // Create new Subscription
              Subscription subscription = Subscription.builder().subscriptionCode(item.getSubscriptionCode()).activatedUserCount(new AtomicInteger(1)).build();
              subscriptions.add(subscription);
              log.info("New subscription : " + subscription);
              return subscription;
            }
    
            @BeforeStep
            public void initList() {
              subscriptions = Collections.synchronizedList(new ArrayList<Subscription>());
            }
    
            @AfterStep
            public void clearList() {
              subscriptions.clear();
            }
          })
          .writer(new ItemWriter<Subscription>() {                  
            public void write(List<? extends Subscription> items) throws Exception {
              log.info(items);
              // do write stuff
            }                   
          })
          .build();
    }
    

    But I have to maintain a second Subscription List into ItemProcessor (I don't know if is thread safe and efficient ?). What do you think about this solution ?