Search code examples
javaspringspring-batchpartition

Spring Batch Partitioning inject stepExecutionContext parameter in itemReader


I am trying to learn Spring Batch with Partitioner.

The issue is that I need to set the filenames dynamically from the Partitioner implementation. And I am trying to get it in the itemReader. But it gives filename null.

My Spring Batch configuration:

@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename) 
    throws UnexpectedInputException, ParseException {
    FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens = { "username", "userid", "transactiondate", "amount" };
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource(
        "input/"+filename));
    DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean(name = "partitioningJob")  
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();  
}  

@Bean 
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();  
}  

@Bean 
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {  
    return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();  
}  

@Bean 
public TransactionPartitioner partitioner() {  
    TransactionPartitioner partitioner = new TransactionPartitioner();  
    return partitioner;  
}                           

@Bean 
public JobListener jobListener() {  
   return new JobListener();  
} 

 @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(2);
        taskExecutor.setQueueCapacity(2);
        taskExecutor.setCorePoolSize(2);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }  

And my TransactionPartitioner class is:

public class TransactionPartitioner implements Partitioner {  

public Map<String, ExecutionContext> partition(int range) {  
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();  
    for (int i = 1; i <= range; i++) {  
        ExecutionContext exContext = new ExecutionContext();  
        exContext.put("filename", "input"+i+".csv");
        exContext.put("name", "Thread" + i);  
        result.put("partition" + i, exContext);  
    }       
    return result;  
}  
}

Is this not the right way to do it? Please suggest.

This is the stack trace:

  18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy19.run(Unknown Source)
    at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
    at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
    ... 9 more

As per @Sabir's suggestion, I checked my data. The step context table looks like this:

  | STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
|                 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL    
|                 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]}                                                                                            | NULL               |
|                 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}  

Here is the full code for it: https://drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp=sharing


Solution

  • Went through your code and tried to run it.

    Currently it is not binding the file name at scope level.

    You have two configuration files:

    1. SpringConfig - containing Spring related config beans
    2. SpringBatchConfig - containing Spring batch related beans

    The first one contains the annotation @EnableBatchProcessing and @Configuration.

    But the itemReader is defined in another config file which do not contain any of the annotations.

    You should have @Configuration on the other file too.

    OR

    You can add both the annotations to SpringBatchConfig config file and can skip them in Spring

    Without this, these configurations are not read properly and the itemReader is not considered as Step Scoped (i.e. the annotation @StepScope does not work) and does not bind the values at step level, and hence you are getting the NULL values.