Search code examples
javaspring-batchspring-cloud-task

Spring Batch with multi - step Spring Cloud Task (PartitionHandler) for Remote Partition


Latest Update (with an image to hope simplify the problem) (thanks for feedback from @Mahmoud)

Relate issue reports for other reference (after this original post created, it seem someone filed issues for Spring Cloud on similar issue, so also update there too):

https://github.com/spring-cloud/spring-cloud-task/issues/793 relate to approach #1

https://github.com/spring-cloud/spring-cloud-task/issues/792 relate to approach #2

Also find a workaround resolution for that issue and update on that github issue, will update this once it is confirmed good by developer https://github.com/spring-cloud/spring-cloud-task/issues/793#issuecomment-894617929

I am developing an application involved multi-steps using spring batch job but hit some roadblock. Did try to research doc and different attempts, but no success. So thought to check if community can shed light

Spring batch job 1 (received job parameter for setting for step 1/setting for step 2)

    Step 1 -> remote partition (partitionhandler (cpu/memory for step 1 + grid) + partitioner) with setting from step1 (job configuration or step configuration)

    Step 2 -> remote partition (partitionhandler (cpu/memory for step 2 + grid) + partitioner) with setting from step2 (job configuration or step configuration, and diff from step 1)

The reason we want is to have different step with different k8s setting (like cpu/memory/grid)

enter image description here

Attempts:

  1. Create two partition handler (partitionHandlerReader + partitionHandlerProcessor) and their corresponding launcher (LauncherReader + LauncherProcessor) enter image description here

Complete Project can be found in https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_1_two_partitionhandlers

The main class of configuration is try to simplify into one class https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_1_two_partitionhandlers/src/main/java/com/example/batchprocessing/BatchConfiguration.java

  1. Use one PartitionerHandler + one TaskLauncher but with @StepScope for late binding for dynamic change base on step and job setup

enter image description here

Complete Project can be found in https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_2_partitionhandler_with_stepscope

The main class of configuration is try to simplify into one class https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/attempt_2_partitionhandler_with_stepscope/src/main/java/com/example/batchprocessing/BatchConfiguration.java

Both Result Following (full trace at above git repo):

During job trigger, it will error (it seem pass initial start up, but error during execution)

Because below will only occur when there are multiple PartitionHandler or when that Bean is at @StepScope or @JobScope


java.lang.NullPointerException: null
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]

Full Log

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.6)

2021-08-06 11:24:29.242  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : Starting BatchProcessingApplication v0.0.1-SNAPSHOT using Java 11.0.7 on localhost.localdomain with PID 90294 (/home/danilko/IdeaProjects/partition/target/batchprocessing-0.0.1-SNAPSHOT.jar started by danilko in /home/danilko/IdeaProjects/partition)
2021-08-06 11:24:29.244  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : The following profiles are active: controller
2021-08-06 11:24:29.790  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-06 11:24:29.794  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-06 11:24:29.797  INFO 90294 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-06 11:24:29.833  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.947  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.959  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$$83e6c2be] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:29.968  INFO 90294 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$cc3cccc1] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-06 11:24:30.093  INFO 90294 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2021-08-06 11:24:30.160  INFO 90294 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2021-08-06 11:24:30.724  INFO 90294 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2021-08-06 11:24:30.736  INFO 90294 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-06 11:24:30.897  INFO 90294 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-08-06 11:24:30.974  INFO 90294 --- [           main] c.e.b.BatchProcessingApplication         : Started BatchProcessingApplication in 2.024 seconds (JVM running for 2.366)
2021-08-06 11:24:30.975  INFO 90294 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2021-08-06 11:24:31.010  INFO 90294 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=partitionedJob-1538890488]] launched with the following parameters: [{}]
Set readerGridSize == 1
2021-08-06 11:24:31.020  INFO 90294 --- [           main] o.s.c.t.b.l.TaskBatchExecutionListener   : The job execution id 22 was run within the task execution 54
2021-08-06 11:24:31.046  INFO 90294 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionReaderStep]
2021-08-06 11:24:31.101 ERROR 90294 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step partitionReaderStep in job partitionedJob-1538890488

java.lang.NullPointerException: null
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorker(DeployerPartitionHandler.java:347) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.launchWorkers(DeployerPartitionHandler.java:313) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at org.springframework.cloud.task.batch.partition.DeployerPartitionHandler.handle(DeployerPartitionHandler.java:302) ~[spring-cloud-task-batch-2.3.1-SNAPSHOT.jar!/:2.3.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at com.sun.proxy.$Proxy65.handle(Unknown Source) ~[na:na]
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.7.jar!/:5.3.7]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.3.jar!/:4.3.3]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.7.jar!/:5.3.7]
    at com.sun.proxy.$Proxy51.run(Unknown Source) ~[na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:799) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:789) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:346) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1329) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1318) ~[spring-boot-2.4.6.jar!/:2.4.6]
    at com.example.batchprocessing.BatchProcessingApplication.main(BatchProcessingApplication.java:10) ~[classes!/:0.0.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[batchprocessing-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]


Study/Reference: Most tutorial I found online only involved one partition step. https://dataflow.spring.io/docs/feature-guides/batch/partitioning/

Thanks for info/helps in advance


Solution

    1. Is above even possible setup?

    yes, nothing prevents you from having two partitioned steps in a single Spring Batch job.

    1. Is it possible to use JobScope/StepScope to pass info to the partitionhandler

    yes, it is possible for the partition handler to be declared as a job/step scoped bean if it needs the late-binding feature to be configured.

    Updated on 08/14/2021 by @DanilKo

    The original answer is correct in high - level. However, to actually achieve the partition handeler to be step scoped, a code modification is required

    Below is the analyze + my proposed workaround/fix (maybe eventually code maintainer will have better way to make it work, but so far below fix is working for me)

    Issue being continued to discuss at: https://github.com/spring-cloud/spring-cloud-task/issues/793 (multiple partitioner handler discussion) https://github.com/spring-cloud/spring-cloud-task/issues/792 (which this fix is based up to use partitionerhandler at step scope to configure different worker steps + resources + max worker)

    Root cause analyze (hypothesis)

    The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup

    But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task) or there are two partitionHandler, that setup is no longer triggered, as @EnableTask seem not able to locate one partitionhandler during creation.

    https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @ 269

    Resulted created DeployerHandler faced a null with taskExecution when trying to launch (as it is never setup)

    https://github.com/spring-cloud/spring-cloud-task/blob/main/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java @ 347

    Workaround Resolution

    Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference It seem to work, but still not clear if there is other side effect (so far during test not found any)

    Full code can be found in https://github.com/danilko/spring-batch-remote-k8s-paritition-example/tree/attempt_2_partitionhandler_with_stepscope_workaround_resolution

    In the partitionHandler method

        @Bean
        @StepScope
        public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                       JobExplorer jobExplorer,
                                                 @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {
    
    ...
    
          // After the declaration of partitionhandler
            DeployerPartitionHandler partitionHandler =
                    new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                            stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                            , taskRepository);
    
            // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
            // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
            // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
            // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
            // Resulted created DeployerHandler faced a null
    
            // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
            // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
            // It seem to work, but still not clear if there is other side effect (so far during test not found any)
            long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());
    
            System.out.println("Current execution job to task execution id " + executionId);
            TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
            System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
            partitionHandler.beforeTask(taskExecution);
    ...
    
    // rest of code continue
    

    (note it utilize stepExecution context to find out the current trigger step name and therefore assign different worker step) Worker name in this case is coming from pre-defined job execution, but may able to come from jobparameter or another place too)

    That job context is populated with job listner

    Job is configured with job listener

    
        @Bean(name = "partitionedJob")
        @Profile("!worker")
        public Job partitionedJob()throws Exception {
            Random random = new Random();
            return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                    .start(partitionReaderStep())
                    .listener(jobExecutionListener())
                    .next(partitionProcessorStep())
                    .build();
        }
    
    

    In job listener populated it

    
        @Bean
        public JobExecutionListener jobExecutionListener() {
        JobExecutionListener listener = new JobExecutionListener(){
            @Override
            public void beforeJob(JobExecution jobExecution)
            {
                jobExecution.getExecutionContext().putString("readerCPURequest", "1");
                jobExecution.getExecutionContext().putString("readerCPULimit", "2");
    
                jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");
    
                // For now using same image for reader/processor, but if it work, can split them
                jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
                jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");
    
                jobExecution.getExecutionContext().putString("processorCPURequest", "3");
                jobExecution.getExecutionContext().putString("processorCPULimit", "4");
    
                jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");
    
                // For now using same image for reader/processor, but if it work, will split them
                jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
                jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");
    
                System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));
    
            }
    
            @Override
            public void afterJob(JobExecution jobExecution) {
            }
        };
    
        return listener;
        }
    
    

    Full code (can also be found in my code github after the workaround fix is being applied): https://github.com/danilko/spring-batch-remote-k8s-paritition-example/blob/main/src/main/java/com/example/batchprocessing/BatchConfiguration.java

    package com.example.batchprocessing;
    
    import io.fabric8.kubernetes.api.model.DeletionPropagation;
    import io.fabric8.kubernetes.api.model.batch.JobList;
    import io.fabric8.kubernetes.api.model.batch.JobSpec;
    import io.fabric8.kubernetes.api.model.batch.JobStatus;
    import io.fabric8.kubernetes.client.KubernetesClient;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.JobRegistry;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.explore.JobExplorer;
    import org.springframework.batch.core.partition.PartitionHandler;
    import org.springframework.batch.core.partition.support.Partitioner;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.cloud.deployer.resource.docker.DockerResource;
    import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
    import org.springframework.cloud.deployer.spi.kubernetes.*;
    import org.springframework.cloud.deployer.spi.task.TaskLauncher;
    import org.springframework.cloud.task.batch.partition.*;
    import org.springframework.cloud.task.configuration.EnableTask;
    import org.springframework.cloud.task.repository.TaskExecution;
    import org.springframework.cloud.task.repository.TaskExplorer;
    import org.springframework.cloud.task.repository.TaskRepository;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.core.env.Environment;
    import org.springframework.core.env.SystemEnvironmentPropertySource;
    import org.springframework.core.io.Resource;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.core.task.TaskRejectedException;
    import org.springframework.util.StringUtils;
    
    import java.util.*;
    
    
    @Configuration
    @EnableBatchProcessing
    @EnableTask
    public class BatchConfiguration {
    
        private static int BACK_OFF_LIMIT = 6;
    
        // Set the kuberentes job name
        private String taskName_prefix="partitionedbatchjob";
    
        @Autowired
        public JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        public StepBuilderFactory stepBuilderFactory;
    
        @Autowired
        public JobExplorer jobExplorer;
    
        @Autowired
        public JobRepository jobRepository;
    
        @Autowired
        public TaskExecutor taskExecutor;
    
        @Autowired
        public TaskRepository taskRepository;
    
        @Autowired
        public TaskExplorer taskExplorer;
    
        @Autowired
        private ConfigurableApplicationContext context;
    
        @Autowired
        private DelegatingResourceLoader resourceLoader;
    
        @Autowired
        private Environment environment;
    
        @Bean
        @StepScope
        public Partitioner partitioner( @Value("#{stepExecution}") StepExecution stepExecution) {
            return new Partitioner() {
                @Override
                public Map<String, ExecutionContext> partition(int gridSize) {
    
                    Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
    
                    int targetGridSize = 0;
                    String step = "";
                    if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
                    {
                        step = "reader";
                    }
                    else
                    {
                        step = "processor";
                    }
    
                    targetGridSize = Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize"));
    
                    for (int i = 0; i < targetGridSize; i++) {
                        ExecutionContext context1 = new ExecutionContext();
                        context1.put("partitionNumber", i);
    
                        partitions.put("partition" + i, context1);
                    }
    
                    return partitions;
                }
            };
        }
    
        @Bean
        public KubernetesClient kuberentesClient()
        {
            KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
    
            return KubernetesClientFactory.getKubernetesClient(kubernetesDeployerProperties);
        }
    
    
        @Bean
        @StepScope
        public TaskLauncher taskLauncher( @Value("#{stepExecution}") StepExecution stepExecution)
        {
            KubernetesDeployerProperties kubernetesDeployerProperties = new KubernetesDeployerProperties();
            kubernetesDeployerProperties.setNamespace("default");
    
            kubernetesDeployerProperties.setCreateJob(true);
    
            // Database setup to reference configmap for database info
            List<KubernetesDeployerProperties.ConfigMapKeyRef> configMapKeyRefList = new ArrayList<KubernetesDeployerProperties.ConfigMapKeyRef>();
            KubernetesDeployerProperties.ConfigMapKeyRef configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
            configMapKeyRef.setConfigMapName("mariadb");
            configMapKeyRef.setDataKey("SPRING_DATASOURCE_URL");
            configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_URL");
            configMapKeyRefList.add(configMapKeyRef);
    
            configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
            configMapKeyRef.setConfigMapName("mariadb");
            configMapKeyRef.setDataKey("SPRING_DATASOURCE_USERNAME");
            configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_USERNAME");
            configMapKeyRefList.add(configMapKeyRef);
    
            configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
            configMapKeyRef.setConfigMapName("mariadb");
            configMapKeyRef.setDataKey("SPRING_DATASOURCE_PASSWORD");
            configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_PASSWORD");
            configMapKeyRefList.add(configMapKeyRef);
    
            configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
            configMapKeyRef.setConfigMapName("mariadb");
            configMapKeyRef.setDataKey("SPRING_DATASOURCE_DRIVERCLASSNAME");
            configMapKeyRef.setEnvVarName("SPRING_DATASOURCE_DRIVERCLASSNAME");
            configMapKeyRefList.add(configMapKeyRef);
    
            configMapKeyRef = new KubernetesDeployerProperties.ConfigMapKeyRef();
            configMapKeyRef.setConfigMapName("mariadb");
            configMapKeyRef.setDataKey("SPRING_PROFILES_ACTIVE");
            configMapKeyRef.setEnvVarName("SPRING_PROFILES_ACTIVE");
            configMapKeyRefList.add(configMapKeyRef);
    
    
            kubernetesDeployerProperties.setConfigMapKeyRefs(configMapKeyRefList);
    
            // Set request resource
            KubernetesDeployerProperties.RequestsResources request = new KubernetesDeployerProperties.RequestsResources();
            KubernetesDeployerProperties.LimitsResources limit = new KubernetesDeployerProperties.LimitsResources();
    
            String step = "";
    
            if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep"))
            {
                step="reader";
            }
            else
            {
                step="processor";
            }
    
            request.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step + "CPURequest"));
            request.setMemory("2000Mi");
    
    
            limit.setCpu(stepExecution.getJobExecution().getExecutionContext().getString(step +"CPULimit"));
            limit.setMemory("3000Mi");
    
    
            kubernetesDeployerProperties.setRequests(request);
            kubernetesDeployerProperties.setLimits(limit);
    
            // as build on local image, so need to use local
            kubernetesDeployerProperties.setImagePullPolicy(ImagePullPolicy.IfNotPresent);
    
            // Set task launcher properties to not repeat and not restart
            KubernetesTaskLauncherProperties kubernetesTaskLauncherProperties = new KubernetesTaskLauncherProperties();
    
            // https://kubernetes.io/docs/concepts/workloads/controllers/job/
            // Set to never to create new pod on restart
            kubernetesTaskLauncherProperties.setBackoffLimit(BACK_OFF_LIMIT);
            kubernetesTaskLauncherProperties.setRestartPolicy(RestartPolicy.Never);
            KubernetesTaskLauncher kubernetesTaskLauncher = new KubernetesTaskLauncher(kubernetesDeployerProperties,
                    kubernetesTaskLauncherProperties, kuberentesClient());
    
            return kubernetesTaskLauncher;
        }
    
    
        @Bean(name = "partitionedJob")
        @Profile("!worker")
        public Job partitionedJob()throws Exception {
            Random random = new Random();
            return jobBuilderFactory.get("partitionedJob" + random.nextInt())
                    .start(partitionReaderStep())
                    .listener(jobExecutionListener())
                    .next(partitionProcessorStep())
                    .build();
        }
    
        @Bean(name = "partitionReaderStep")
        public Step partitionReaderStep() throws Exception {
    
            return stepBuilderFactory.get("partitionReaderStep")
                    .partitioner(workerStepReader().getName(),  partitioner( null))
                    .step(workerStepReader())
                    .partitionHandler(partitionHandler(
                            taskLauncher( null),
                            jobExplorer, null))
                    .build();
        }
    
        @Bean(name = "partitionProcessorStep")
        public Step partitionProcessorStep() throws Exception {
    
            return stepBuilderFactory.get("partitionProcessorStep")
                    .partitioner(workerStepProcessor().getName(), partitioner( null))
                    .step(workerStepProcessor())
                    .partitionHandler(partitionHandler(
                            taskLauncher( null),
                            jobExplorer, null))
                    .build();
        }
    
    
        @Bean
        @StepScope
        public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
                                                       JobExplorer jobExplorer,
                                                 @Value("#{stepExecution}") StepExecution stepExecution) throws Exception {
    
            String step ="processor";
    
            if(stepExecution.getStepName().equalsIgnoreCase("partitionReaderStep")) {
                step = "reader";
            }
    
            // Use local build image
            DockerResource resource = new DockerResource(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerImage"));
    
    
            DeployerPartitionHandler partitionHandler =
                    new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                            stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerStep")
                            , taskRepository);
    
            // Issue https://github.com/spring-cloud/spring-cloud-task/issues/793
            // Perform the setting of execution as this partitioner now not created at task level so @beforetask is no longer vaild
            // The problem is DeployerPartitionHandler utilize annoation @BeforeTask to force task to pass in TaskExecution object as part of Task setup
            // But as this partionerHandler is now at @StepScope (instead of directly at @Bean level with @Enable Task), that setup is no longer triggered
            // Resulted created DeployerHandler faced a null
    
            // Below is essentially a workaround to use the current job execution id to retrieve the associated task execution id
            // From there, got that task execution and passed to deploy handler to fulfill its need of taskExecution reference
            // It seem to work, but still not clear if there is other side effect (so far during test not found any)
            long executionId = taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId());
    
            System.out.println("Current execution job to task execution id " + executionId);
            TaskExecution taskExecution = taskExplorer.getTaskExecution(taskExplorer.getTaskExecutionIdByJobExecutionId(stepExecution.getJobExecutionId()));
            System.out.println("Current execution job to task execution is not null: " + (taskExecution != null));
            partitionHandler.beforeTask(taskExecution);
    
            List<String> commandLineArgs = new ArrayList<>(3);
            commandLineArgs.add("--spring.profiles.active=worker");
            commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
            commandLineArgs.add("--spring.batch.initializer.enabled=false");
            partitionHandler
                    .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
            partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
            partitionHandler.setMaxWorkers(Integer.parseInt(stepExecution.getJobExecution().getExecutionContext().getString(step + "WorkerGridSize")));
    
                partitionHandler.setApplicationName(taskName_prefix + step);
    
            return partitionHandler;
        }
    
        @Bean
        public JobExecutionListener jobExecutionListener() {
        JobExecutionListener listener = new JobExecutionListener(){
            @Override
            public void beforeJob(JobExecution jobExecution)
            {
                jobExecution.getExecutionContext().putString("readerCPURequest", "1");
                jobExecution.getExecutionContext().putString("readerCPULimit", "2");
    
                jobExecution.getExecutionContext().putString("readerWorkerGridSize", "1");
    
                // For now using same image for reader/processor, but if it work, can split them
                jobExecution.getExecutionContext().putString("readerWorkerImage", "worker:latest");
                jobExecution.getExecutionContext().putString("readerWorkerStep", "workerStepReader");
    
                jobExecution.getExecutionContext().putString("processorCPURequest", "3");
                jobExecution.getExecutionContext().putString("processorCPULimit", "4");
    
                jobExecution.getExecutionContext().putString("processorWorkerGridSize", "2");
    
                // For now using same image for reader/processor, but if it work, will split them
                jobExecution.getExecutionContext().putString("processorWorkerImage", "worker:latest");
                jobExecution.getExecutionContext().putString("processorWorkerStep", "workerStepProcessor");
    
                System.out.println("Set readerGridSize == " + jobExecution.getExecutionContext().getString("readerGridSize", "IT IS NULL WHICH IS INCORRECT"));
    
            }
    
            @Override
            public void afterJob(JobExecution jobExecution) {
            }
        };
    
        return listener;
        }
    
        @Bean
        @Profile("worker")
        public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
            return new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
        }
    
        @Bean(name = "workerStepReader")
        public Step workerStepReader() {
            return this.stepBuilderFactory.get("workerStepReader")
                    .tasklet(workerTaskletReader(null))
                    .build();
        }
    
        @Bean(name = "workerStepProcessor")
        public Step workerStepProcessor() {
            return this.stepBuilderFactory.get("workerStepProcessor")
                    .tasklet(workerTaskletProcessor(null))
                    .build();
        }
    
    
    
        @Bean
        @StepScope
        public Tasklet workerTaskletReader(
                final @Value("#{stepExecution}") StepExecution stepExecution) {
    
            return new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                    Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                    System.out.println("This workerTaskletReader ran partition: " + partitionNumber);
    
                    return RepeatStatus.FINISHED;
                }
            };
        }
    
        @Bean
        @StepScope
        public Tasklet workerTaskletProcessor(
                final @Value("#{stepExecution}") StepExecution stepExecution) {
    
            return new Tasklet() {
                @Override
                public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                    Integer partitionNumber = stepExecution.getExecutionContext().getInt("partitionNumber");
                    System.out.println("This workerTaskletProcessor ran partition: " + partitionNumber);
    
                    return RepeatStatus.FINISHED;
                }
            };
        }
    }