Search code examples
spring-bootspring-batch

springbatch partition vs parallel


I need your help. Here's the story that I want to program.

  1. call rest api each 5 targets(stores) to get the total number of orderId and orderIds and save them to the repository.

  2. select total number of orderId and orderIds dividely and call another rest api each 5 targets(stores) to get order list using as parameter total number of orderId and orderIdsand save to the repository.

I want to know how should I do in the spring batch like partitioning, parallel steps... Can you help me to organize the process of how to make it?

I am learning about partitioning and parallel steps


Solution

    • Define a Job with two main steps: Step 1: Fetch order data from the REST APIs. Step 2: Call another REST API to get the order list and save it to the repository.

    • Implement partitioning for each step to process multiple stores in parallel.

    Define the Job:

    @Configuration
    @EnableBatchProcessing
    public class OrderProcessingJobConfig {
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        // Define your reader, processor, and writer beans
    
        @Bean
        public Job orderProcessingJob() {
            return jobBuilderFactory.get("orderProcessingJob")
                    .start(fetchOrderDataStep())
                    .next(getOrderListStep())
                    .build();
        }
    
        @Bean
        public Step fetchOrderDataStep() {
            // Step 1 configuration goes here
        }
    
        @Bean
        public Step getOrderListStep() {
            // Step 2 configuration goes here
        }
    }
    

    Implement partitioning for each step:

    @Bean
    public Step fetchOrderDataStep() {
        return stepBuilderFactory.get("fetchOrderDataStep")
                .partitioner("fetchOrderDataStep", partitioner())
                .step(fetchOrderDataStepSlave())
                .gridSize(5) // Number of parallel threads
                .taskExecutor(taskExecutor())
                .build();
    }
    
    @Bean
    public Partitioner partitioner() {
        return new StorePartitioner(); // A custom partitioner implementation
    }
    
    @Bean
    public Step fetchOrderDataStepSlave() {
        return stepBuilderFactory.get("fetchOrderDataStepSlave")
                .<InputType, OutputType>chunk(5)
                .reader(reader(null))
                .processor(processor())
                .writer(writer())
                .build();
    }
    
    @Bean
    @StepScope
    public ItemReader<InputType> reader(@Value("#{stepExecutionContext['targetStore']}") String targetStore) {
        // Implement your REST API reader
    }
    
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
    

    Call another REST API to get the order list and save it to the repository.

    @Bean
    public Step getOrderListStep() {
        return stepBuilderFactory.get("getOrderListStep")
                .partitioner("getOrderListStep", partitioner())
                .step(getOrderListStepSlave())
                .gridSize(5) // Number of parallel threads
                .taskExecutor(taskExecutor())
                .build();
    }
    
    @Bean
    public Step getOrderListStepSlave() {
        return stepBuilderFactory.get("getOrderListStepSlave")
                .<InputType, OutputType>chunk(5)
                .reader(orderListReader(null))
                .processor(orderListProcessor())
                .writer(orderListWriter())
                .build();
    }
    
    @Bean
    @StepScope
    public ItemReader<InputType> orderListReader(@Value("#{stepExecutionContext['targetStore']}") String targetStore) {
        // Implement your REST API reader for the order list
    }
    

    for the new requirement as per the comment

    Define a Job with two main steps (as before): a. Step 1: Fetch order data from the REST APIs. b. Step 2: Call another REST API to get the order list and save it to the repository.

    Use a Flow to define parallel steps for each target store.

    @Configuration
    @EnableBatchProcessing
    public class OrderProcessingJobConfig {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        // Define your reader, processor, and writer beans
    
        @Bean
        public Job orderProcessingJob() {
            return jobBuilderFactory.get("orderProcessingJob")
                    .start(fetchOrderDataFlow())
                    .split(taskExecutor()) // Add this line to split the flow
                    .add(getOrderListFlow())
                    .end()
                    .build();
        }
    
        @Bean
        public Flow fetchOrderDataFlow() {
            FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("fetchOrderDataFlow");
            // Add all target store steps in parallel
            flowBuilder.split(taskExecutor())
                    .add(fetchOrderDataStep1(), fetchOrderDataStep2(), fetchOrderDataStep3(), fetchOrderDataStep4(), fetchOrderDataStep5());
            return flowBuilder.build();
        }
    
        @Bean
        public Flow getOrderListFlow() {
            FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("getOrderListFlow");
            // Add all target store steps in parallel
            flowBuilder.split(taskExecutor())
                    .add(getOrderListStep1(), getOrderListStep2(), getOrderListStep3(), getOrderListStep4(), getOrderListStep5());
            return flowBuilder.build();
        }
    
        @Bean
        public TaskExecutor taskExecutor() {
            return new SimpleAsyncTaskExecutor();
        }
    }
    

    2: Define separate step beans for each target store:

    // Step 1: Fetch order data from the REST APIs
    
    @Bean
    public Step fetchOrderDataStep1() {
        return createFetchOrderDataStep("fetchOrderDataStep1", "store1");
    }
    
    @Bean
    public Step fetchOrderDataStep2() {
        return createFetchOrderDataStep("fetchOrderDataStep2", "store2");
    }
    
    @Bean
    public Step fetchOrderDataStep3() {
        return createFetchOrderDataStep("fetchOrderDataStep3", "store3");
    }
    
    @Bean
    public Step fetchOrderDataStep4() {
        return createFetchOrderDataStep("fetchOrderDataStep4", "store4");
    }
    
    @Bean
    public Step fetchOrderDataStep5() {
        return createFetchOrderDataStep("fetchOrderDataStep5", "store5");
    }
    
    private Step createFetchOrderDataStep(String stepName, String targetStore) {
        return stepBuilderFactory.get(stepName)
                .<InputType, OutputType>chunk(5)
                .reader(reader(targetStore))
                .processor(processor())
                .writer(writer())
                .build();
    }
    

    // Step 2: Call another REST API to get the order list and save it to the repository

    @Bean
    public Step getOrderListStep1() {
        return createGetOrderListStep("getOrderListStep1", "store1");
    }
    
    @Bean
    public Step getOrderListStep2() {
        return createGetOrderListStep("getOrderListStep2", "store2");
    }
    
    @Bean
    public Step getOrderListStep3() {
        return createGetOrderListStep("getOrderListStep3", "store3");
    }
    
    @Bean
    public Step getOrderListStep4() {
        return createGetOrderListStep("getOrderListStep4", "store4");
    }
    
    @Bean
    public Step getOrderListStep5() {
        return createGetOrderListStep("getOrderListStep5", "store5");
    }
    
    private Step createGetOrderListStep(String stepName, String targetStore) {
        return stepBuilderFactory.get(stepName)
                .<InputType, OutputType>chunk(5)
                .reader(orderListReader(targetStore))
                .processor(orderListProcessor())
                .writer(orderListWriter())
                .build();
    }