I am trying to learn Spring Batch with Partitioner.
The issue is that I need to set the filenames dynamically from the Partitioner implementation. And I am trying to get it in the itemReader
. But it gives filename null.
My Spring Batch configuration:
@Bean
@StepScope
public ItemReader<Transaction> itemReader(@Value("#{stepExecutionContext[filename]}") String filename)
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource(
"input/"+filename));
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<Transaction>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean(name = "partitioningJob")
public Job partitioningJob() throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob").listener(jobListener()).start(partitionStep()).build();
}
@Bean
public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep").partitioner(step2()).partitioner("step2", partitioner()).gridSize(2).taskExecutor(taskExecutor).build();
}
@Bean
public Step step2() throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("step2").<Transaction, Transaction> chunk(1).reader(itemReader(null)).processor(itemProcessor()).writer(itemWriter(marshaller(),null)).build();
}
@Bean
public TransactionPartitioner partitioner() {
TransactionPartitioner partitioner = new TransactionPartitioner();
return partitioner;
}
@Bean
public JobListener jobListener() {
return new JobListener();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(2);
taskExecutor.setQueueCapacity(2);
taskExecutor.setCorePoolSize(2);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
And my TransactionPartitioner
class is:
public class TransactionPartitioner implements Partitioner {
public Map<String, ExecutionContext> partition(int range) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
for (int i = 1; i <= range; i++) {
ExecutionContext exContext = new ExecutionContext();
exContext.put("filename", "input"+i+".csv");
exContext.put("name", "Thread" + i);
result.put("partition" + i, exContext);
}
return result;
}
}
Is this not the right way to do it? Please suggest.
This is the stack trace:
18:23:39.060 [main] DEBUG org.springframework.batch.core.job.AbstractJob - Upgrading JobExecution status: StepExecution: id=1, version=2, name=partitionStep, status=FAILED, exitStatus=FAILED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy19.run(Unknown Source)
at org.baeldung.spring_batch_intro.App.main(App.java:24)
; org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:147)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input resource must exist (reader is in 'strict' mode): class path resource [input/null]
at org.springframework.batch.item.file.FlatFileItemReader.doOpen(FlatFileItemReader.java:251)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:144)
... 9 more
As per @Sabir's suggestion, I checked my data. The step context table looks like this:
| STEP_EXECUTION_ID | SHORT_CONTEXT | SERIALIZED_CONTEXT |
| 1 | {"map":[{"entry":[{"string":"SimpleStepExecutionSplitter.GRID_SIZE","long":2},{"string":["batch.stepType","org.springframework.batch.core.partition.support.PartitionStep"]}]}]} | NULL
| 2 | {"map":[{"entry":[{"string":["filename","input2.csv"]},{"string":["name","Thread2"]}]}]} | NULL |
| 3 | {"map":[{"entry":[{"string":["filename","input1.csv"]},{"string":["name","Thread1"]}]}]}
Here is the full code for it: https://drive.google.com/file/d/0Bziay9b2ceLbUXdTRnZoSjRfR2s/view?usp=sharing
Went through your code and tried to run it.
Currently it is not binding the file name at scope level.
You have two configuration files:
The first one contains the annotation @EnableBatchProcessing
and @Configuration
.
But the itemReader
is defined in another config file which do not contain any of the annotations.
You should have @Configuration
on the other file too.
OR
You can add both the annotations to SpringBatchConfig
config file and can skip them in Spring
Without this, these configurations are not read properly and the itemReader
is not considered as Step Scoped (i.e. the annotation @StepScope
does not work) and does not bind the values at step level, and hence you are getting the NULL
values.