I am trying to use the multiline IteamReader following the spring-batch-sample at https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline
I am running into compilation error as below -
I am sure there is something related to generics as it looking for class implementing ItemReader but the AggregateItemReader implements ItemReader<List>.
public class AggregateItemReader<T> implements ItemReader<List<T>> {
you can find my code here - https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java
UPDATE:
I suppressed the generics and updated the AggregateItemReader as below inorder to call ItemStreamReader open() method.
public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);
private ItemStreamReader<AggregateItem<T>> itemReader;
I noticed that the ItemWriter is writing lists of record instead of record per line
[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]
AND When i try to add a processor, it complains that processor cannot convert the list into Trade object.
@Bean
public ItemProcessor<Trade, Trade> processor() {
return new ItemProcessor<Trade, Trade>() {
@Override
public Trade process(Trade item) throws Exception {
item.setProcessed(true);
return item;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
AggregateItemReader reader,
ItemProcessor processor,
FlatFileItemWriter writer,
StepItemReadListener itemReadListener) {
return stepBuilderFactory.get("multiLineStep")
.chunk(1)
.reader(reader)
.writer(writer)
.processor(processor)
.build();
}
ERROR:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
at my.demo.batch.BatchConfiguration$2.process(BatchConfiguration.java:1) ~[main/:na]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]
HELP:
You need to be consistent in what the type of the items is that you want to handle on batch level. According to your step definition it is Trade
. By calling <Trade, Trade>chunk(1)
on the step builder, you declare that your batch should read items of type Trade
with a chunk size of 1 (i.e. one at a time) and pass these on to a writer for items of type Trade
. In this case, you need to supply a reader of type ItemReader<Trade>
, a writer of type ItemWriter<Trade>
and optionally a processor of type ItemProcessor<Trade, Trade>
.
The problem is that your reader is of type ItemReader<List<Trade>>
, i.e. it does not yield a Trade
for each invocation of its read
method but a list of trades.
If you want to use the AggregateItemReader
you need to wrap it into a custom reader that works as an adapter and actually returns Trade
items and not List<Trade>
.
For example, the custom read
method could look like this:
public Trade read() throws Exception {
if (queue.isEmpty()) {
List<Trade> trades = aggregateItemReader.read();
if (trades != null) {
queue.addAll(trades);
}
}
return queue.poll();
}
with queue
initialized as
private Deque<Trade> queue = new ArrayDeque<>();