Search code examples
javamongodbspring-bootspring-batch

Spring Batch reading half data when field used to query with Reader is being updated with Writer


I have a peculiar problem with my Spring Boot batch processing Job.

The Job reads only half the data from MongoDB, and with each subsequent run it reads half the remaining data (e.g. 30k -> 15k -> 7.5k...), and it finally finishes when there's sub 1000 documents remaining since I set cursorBatchSize(1000) in my ItemReader.

The issue occurs because my ItemReader queries for all documents that do not have the attribute processedAt. It then passes them onto the ItemReader, which processes them and adds the processedAt attribute to the documents, and then saves hem inside the same collection the reader is currently going through.

I wrote an entirely new SpringBoot application to recreate this, removed everything that's irrelevant to the issue, minimized all data models, etc., so there's nothing custom about the Job or Batch config.

It doesn't look like a memory issue since when I run it with the query parameters set to return all documents it works fine.

I don't think it's a race condition since the issue persists after setting throttleLimit(1) or specifying a TaskExecutor with only 1 Thread.

The Document in question only has 2 fields: a name attribute and a processedAt attribute, which isn't appended onto it until the document has been successfully processed.

Here's the ItemReader:

    @Bean(name = "myReader")
    @StepScope
    public MongoItemReader<TroublesomeModel> reader() {
        Query query = new Query();
        query.addCriteria(Criteria.where("processedAt").exists(false));

        query.noCursorTimeout();
        query.cursorBatchSize(1000);

        MongoItemReader<TroublesomeModel> reader = new MongoItemReader<>();
        reader.setTemplate(mongoTemplate);
        reader.setTargetType(TroublesomeModel.class);
        reader.setSaveState(false);
        reader.setQuery(query);

        return reader;
    }

The ItemWriter:

    @Override
    public void write(List<? extends TroublesomeModel> items) {
        BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, TroublesomeModel.class);

        for (TroublesomeModel item: items) {
            item.setAsProcessed();
            bulkOperations.updateMulti(
                    Query.query(Criteria.where("_id").is(item.getId())),
                    new Update().set("processedAt", item.getProcessedAt())
            );
        }

        bulkOperations.execute();
    }

    @Bean(name = "myWriter")
    @StepScope
    public ItemWriter<TroublesomeModel> writer() {
        return new BatchItemWriterConfig(mongoTemplate);
    }

And the Job and Step configuration:

    @Bean(name = "myStep")
    public Step step(
            @Qualifier("myReader") MongoItemReader<TroublesomeModel> myReader,
            @Qualifier("myWriter") ItemWriter<? super TroublesomeModel> myWriter
    ) {
        return stepBuilderFactory.get("step")
                .<TroublesomeModel, TroublesomeModel>chunk(100) // has to be 100
                .reader(myReader)
                .writer(myWriter)
                .throttleLimit(10)
                .build();
    }

    @Bean(name = "myJob")
    public Job myJob(@Qualifier("myStep") Step step) {
        return jobBuilderFactory
                .get(JOB_NAME)
                .start(step)
                .build();
    }

Is my current logic just not feasible to implement or is there something I'm missing?


Solution

  • I figured out what the issue was, and it's caused by the way ItemReader fetches new data.

    I logged every query on the database and came upon this line:

    { find: "message", filter: { processedAt: { $exists: false } },
    skip: 8190, limit: 10, batchSize: 1000, noCursorTimeout: true, $db: ...
    
    

    The interaction between skip and filter causes data to be skipped because once an item has been processed (and marked as such in the database) it is no longer returned by the filter-query. That means that when skip is called on the query, it is skipping over unprocessed data since the processed data was excluded from the query.

    The conclusion from this is that it is not possible to update the data while it is being processed.


    The workaround I used was to remove the query parameter for processedAt and create a new ItemProcessor that checks if the item has been processed before passing it on to ItemWriter. The efficiency of this is questionable. If you expect the query to return mostly data that is valid for processing (i.e. without the processedAt field), then it's a good solution. If you expect a lot of data is going to be discarded because it already has a processedAt flag, then you might want to look into other solutions (and please do share them if you figure something out).

    This is what the ItemProcessor looks like, nothing fancy:

    public class MyItemProcessor implements ItemProcessor<TroublesomeModel, TroublesomeModel> {
        @Override
        public TroublesomeModel process(TroublesomeModel item) throws Exception {
            if (item.getProcessedAt() != null) {
                return null;
            }
            return item;
        }
    }
    

    And since I am using BulkOperations in my ItemWriter which cannot handle empty lists, there needs to be a null check before entering it as well.


    An alternative workaround would be to just execute the query as-is, with the processedAt flag, set a reader.maxItemCount to a low enough number (in my case 1000), and re-run the job inside a scheduler on repeat. This fits my use case since it's supposed to actively process data as it arrives. No data is duplicated and it doesn't cause any unnecessary data processing or lengthy database queries.