Search code examples
springspring-batchmultiline

Multiline AggregateItemReader not working as suggested in spring-batch-samples


I am trying to use the multiline IteamReader following the spring-batch-sample at https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline

I am running into compilation error as below -

enter image description here

I am sure there is something related to generics as it looking for class implementing ItemReader but the AggregateItemReader implements ItemReader<List>.

public class AggregateItemReader<T> implements ItemReader<List<T>> {

you can find my code here - https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java

UPDATE:

I suppressed the generics and updated the AggregateItemReader as below inorder to call ItemStreamReader open() method.

public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);

private ItemStreamReader<AggregateItem<T>> itemReader;

I noticed that the ItemWriter is writing lists of record instead of record per line

[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]

AND When i try to add a processor, it complains that processor cannot convert the list into Trade object.

@Bean
public ItemProcessor<Trade, Trade> processor() {
    return new ItemProcessor<Trade, Trade>() {

        @Override
        public Trade process(Trade item) throws Exception {
            item.setProcessed(true);
            return item;
        }
    };
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
        AggregateItemReader reader, 
        ItemProcessor processor,
        FlatFileItemWriter writer,
        StepItemReadListener itemReadListener) {
    return stepBuilderFactory.get("multiLineStep")
            .chunk(1)
            .reader(reader)
            .writer(writer)
            .processor(processor)
            .build();
}

ERROR:

java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
    at my.demo.batch.BatchConfiguration$2.process(BatchConfiguration.java:1) ~[main/:na]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]

HELP:

  1. How can we make it work without suppressing generics ?
  2. How to ensure that ItemReader returns list just the same way as it does with chunk processing, so that ItemProcessor and ItemWriter works as usual ?
  3. Is it possible to do so without extending the SimpleStepBuilder & SimpleChunkProvider ?

Solution

  • You need to be consistent in what the type of the items is that you want to handle on batch level. According to your step definition it is Trade. By calling <Trade, Trade>chunk(1) on the step builder, you declare that your batch should read items of type Trade with a chunk size of 1 (i.e. one at a time) and pass these on to a writer for items of type Trade. In this case, you need to supply a reader of type ItemReader<Trade>, a writer of type ItemWriter<Trade> and optionally a processor of type ItemProcessor<Trade, Trade>.

    The problem is that your reader is of type ItemReader<List<Trade>>, i.e. it does not yield a Trade for each invocation of its read method but a list of trades.

    If you want to use the AggregateItemReader you need to wrap it into a custom reader that works as an adapter and actually returns Trade items and not List<Trade>.

    For example, the custom read method could look like this:

    public Trade read() throws Exception {
        if (queue.isEmpty()) {
            List<Trade> trades = aggregateItemReader.read();
            if (trades != null) {
                queue.addAll(trades);
            }
        }
        return queue.poll();
    }
    

    with queue initialized as

    private Deque<Trade> queue = new ArrayDeque<>();