Search code examples
springspring-bootspring-batchaop

Defining Spring Batch Steps programmatically


I'm implementing an application, where I have several dozens of steps, that are mostly the same, but only differs in parameters that are given to them. To do that, I wrote some abstraction, that creates the necessary reader/processor/writer and step by a definition class, that is defined by the implementing developer. It looks like this:

    @Bean
public StepDefinition<ExampleSource, ExampleTarget> exampleDefinition() {
    return new StepDefinition<>(
            new Reader<>(ExampleSource.class, "id,name", "table", null, "id"),
            item -> new BankEntity(item.getId(), item.getName(), "", new HashSet<>()),
            new Writer<>(ExampleTarget.class, """
                    INSERT INTO target (id,name) VALUES (:id, :name)
                    """));
}

public class StepRegistrar implements ImportBeanDefinitionRegistrar, BeanFactoryAware, EnvironmentAware {
private Environment env;

private BeanFactory beanFactory;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    this.beanFactory = beanFactory;
}

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {

    StepDefinition<?, ?> stepDefinition = beanFactory.getBean(StepDefinition.class);
    String readerName = StringUtils.uncapitalize(String.format("%sReaderFactory", stepDefinition.reader().entityType().getSimpleName()));
    String writerName = StringUtils.uncapitalize(String.format("%sWriterFactory", stepDefinition.writer().entityType().getSimpleName()));

    String stepName = StringUtils.uncapitalize(String.format("%sTo%sStepFactory", stepDefinition.reader().entityType().getSimpleName(),
            stepDefinition.writer().entityType().getSimpleName()));

    AbstractBeanDefinition readerFactoryBean = createReaderFactoryBean(readerName, stepDefinition.reader().entityType(), stepDefinition.reader().select(),
            stepDefinition.reader().from(), stepDefinition.reader().where(), stepDefinition.reader().orderKey());
    registry.registerBeanDefinition(readerName, readerFactoryBean);

    AbstractBeanDefinition writerFactoryBean = createWriterFactoryBean(stepDefinition.writer().sql());
    registry.registerBeanDefinition(writerName, writerFactoryBean);

    AbstractBeanDefinition stepBean = createStepFactoryBean(stepName, 10000, readerName, stepDefinition.supplier(), writerName);
    registry.registerBeanDefinition(stepName, stepBean);
}

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

}

@Override
public void setEnvironment(Environment environment) {
    this.env = environment;
}

private <T> AbstractBeanDefinition createReaderFactoryBean(String name, Class<T> entityType, String select, String from, String where, String orderBy) {

    return BeanDefinitionBuilder.rootBeanDefinition(BiDataReaderFactoryBean.class)
            .addConstructorArgValue(name)
            .addConstructorArgValue(select)
            .addConstructorArgValue(from)
            .addConstructorArgValue(where)
            .addConstructorArgValue(orderBy)
            .addConstructorArgValue(env.getProperty("jobParameters['lastUpdate']", Long.class, 0L))
            .addConstructorArgReference("biDataSource")
            .addConstructorArgValue(entityType)
            .setScope("step")
            .getBeanDefinition();
}

private AbstractBeanDefinition createWriterFactoryBean(String sql) {
    return BeanDefinitionBuilder.rootBeanDefinition(BiDataWriterFactoryBean.class)
            .addConstructorArgValue(sql)
            .addConstructorArgReference("dataSource")
            .setScope("step")
            .getBeanDefinition();
}

private AbstractBeanDefinition createStepFactoryBean(String stepName, long chunkSize, String readerName, Object processor, String writerName) {
    return BeanDefinitionBuilder.rootBeanDefinition(StepFactoryBean.class)
            .addConstructorArgValue(stepName)
            .addConstructorArgReference("jobRepository")
            .addConstructorArgValue(chunkSize)
            .addConstructorArgReference(readerName)
            .addConstructorArgValue(processor)
            .addConstructorArgReference(writerName)
            .addConstructorArgReference("transactionManager")
            .addConstructorArgReference("batchTaskExecutor")
            .getBeanDefinition();
}

}

This is mostly working fine, the beans are correctly registered, EXCEPT that I can't scope them to the "step" scope and access env.getProperty("jobParameters['lastUpdate']", Long.class, 0L because its only there when the step is exectuted inside a job. It seems that Spring Batch is doing some kind of proxying around this beans to delay the actual initialisation until the step is actually executed. Can this kind of proxying also be done programmatically?


Solution

  • As already hinted at in the comments to your question, not everything needs to be a bean. Using beans for beans' sake often leads to massive overcomplication.

    In case you want to use Spring Cloud Task, your Jobs are expected to be beans for its auto-configuration. But even for SCT, it's not strictly required.

    If you want to expose your jobs as beans, please consider to do more straight-forward job assembly in your factory methods for the job beans. You could also have a look at Spring Batch's JobRegistry as an even less beany alternative, which nevertheless works with stuff like Spring Boot auto-configuration.

    A good imperative alternative to get to the job parameters is implementing JobExecutionListener or StepExecutionListener and retrieving the job parameters from the execution contexts.

    Having said that, the following Spring Boot 3 app logs Hello World when started with the command line argument parameter=World:

    @SpringBootApplication
    class CrazyApplication {
    
      private static final Logger LOGGER = LoggerFactory.getLogger(CrazyApplication.class);
    
      public static void main(String[] args) {
        SpringApplication.run(CrazyApplication.class, args);
      }
    
      @Bean
      Job job(JobRepository jobRepository, Step step) {
        return new JobBuilder("job", jobRepository).start(step).build();
      }
    
      @Component
      static class StepRegistrar implements BeanDefinitionRegistryPostProcessor 
      {
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
        }
    
        @Override
        public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
          var parameterBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(String.class)
              .addConstructorArgValue("#{jobParameters['parameter']}")
              .setScope("job")
              .getBeanDefinition();
          registry.registerBeanDefinition("parameterString", parameterBeanDefinition);
          var beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(StepFactoryBean.class)
              .addConstructorArgReference("jobRepository")
              .addConstructorArgReference("transactionManager")
              .addConstructorArgReference("parameterString")
              .setScope("job")
              .getBeanDefinition();
          var beanDefinitionHolder = ScopedProxyUtils.createScopedProxy(
              new BeanDefinitionHolder(beanDefinition, "step"),
              registry,
              true
          );
          registry.registerBeanDefinition(beanDefinitionHolder.getBeanName(), beanDefinitionHolder.getBeanDefinition());
        }
    
      }
    
      static class StepFactoryBean implements FactoryBean<Step> {
    
        private final JobRepository jobRepository;
        private final PlatformTransactionManager transactionManager;
        private final String parameter;
    
        StepFactoryBean(JobRepository jobRepository, PlatformTransactionManager transactionManager, String parameter) {
          this.jobRepository = jobRepository;
          this.transactionManager = transactionManager;
          this.parameter = parameter;
        }
    
        @Override
        public Step getObject() {
          return new StepBuilder("step", jobRepository)
              .tasklet(((contribution, chunkContext) -> {
                  LOGGER.info("Hello " + parameter);
                  return RepeatStatus.FINISHED;
              }), transactionManager)
              .build();
        }
    
        @Override
        public Class<?> getObjectType() {
          return Step.class;
        }
    
      }
    
    }