Search code examples
springspring-batch

How control chunk processed elements before write them to database?


I have this situation in my jt:

  • Reader: Read a big csv file
  • Processor: Process lines, check data using a database Select, if data exists, marks it to update. If not exists marks it to insert.
  • Writer: Classifier to redirect the item to insert/update Writers.

My commit-interval is 1000. If my file contains repeated items in the same "chunk" interval. ¿How can detect that these item should be marked as updated instead insert?

File example:

Item1Code|2023-05-01|02 --> detect not exists, mark as insert
Item2Code|2023-05-02|03 --> detect not exists, mark as insert
Item3Code|2023-05-03|03 --> detect not exists, mark as insert
Item1Code|2023-05-04|03 --> detect not exists, mark as insert (should be detected as exists because It will inserted in line1)
Item4Code|2023-05-05|03 --> detect not exists, mark as insert
...
commit here <<

I only think this can be resolved using commit-interval 1. There are some tools in Spring batch to resolve this type of problem?

thanks in advance


Solution

  • There is no out-of-the-box tool for this specific problem. However it can be easy implemented using an in-memory cache and the ChunkListener interface.

    Code example of the component:

    @Component
    public class InsertedCache implements ChunkListener {
        private Set<String> cache = new HashSet<>();
    
        @Override
        public void beforeChunk(ChunkContext context) {
            cache.clear();
        }
    
        public boolean isInserted(String id) {
            return !cache.add(id);
        }
    }
    

    Usage example in the ItemProcessor:

    @Slf4j
    public class CustomerItemProcessor implements ItemProcessor {
    
        @Autowired
        private InsertedCache insertedCache;
    
        @Override
        public Object process(Object item) {
            if (item instanceof Customer) {
                Customer customer = (Customer) item;
                if (insertedCache.isInserted(customer.getName())) {
                    log.info("already iserted customer: {}", customer);
                }
            }
            return item;
        }
    }