I have a peculiar problem with my Spring Boot batch processing Job.
The Job reads only half the data from MongoDB, and with each subsequent run it reads half the remaining data (e.g. 30k -> 15k -> 7.5k...), and it finally finishes when there's sub 1000 documents remaining since I set cursorBatchSize(1000)
in my ItemReader
.
The issue occurs because my ItemReader
queries for all documents that do not have the attribute processedAt
. It then passes them onto the ItemReader
, which processes them and adds the processedAt
attribute to the documents, and then saves hem inside the same collection the reader is currently going through.
I wrote an entirely new SpringBoot application to recreate this, removed everything that's irrelevant to the issue, minimized all data models, etc., so there's nothing custom about the Job or Batch config.
It doesn't look like a memory issue since when I run it with the query parameters set to return all documents it works fine.
I don't think it's a race condition since the issue persists after setting throttleLimit(1)
or specifying a TaskExecutor with only 1 Thread.
The Document in question only has 2 fields: a name
attribute and a processedAt
attribute, which isn't appended onto it until the document has been successfully processed.
Here's the ItemReader
:
@Bean(name = "myReader")
@StepScope
public MongoItemReader<TroublesomeModel> reader() {
Query query = new Query();
query.addCriteria(Criteria.where("processedAt").exists(false));
query.noCursorTimeout();
query.cursorBatchSize(1000);
MongoItemReader<TroublesomeModel> reader = new MongoItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setTargetType(TroublesomeModel.class);
reader.setSaveState(false);
reader.setQuery(query);
return reader;
}
The ItemWriter
:
@Override
public void write(List<? extends TroublesomeModel> items) {
BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, TroublesomeModel.class);
for (TroublesomeModel item: items) {
item.setAsProcessed();
bulkOperations.updateMulti(
Query.query(Criteria.where("_id").is(item.getId())),
new Update().set("processedAt", item.getProcessedAt())
);
}
bulkOperations.execute();
}
@Bean(name = "myWriter")
@StepScope
public ItemWriter<TroublesomeModel> writer() {
return new BatchItemWriterConfig(mongoTemplate);
}
And the Job
and Step
configuration:
@Bean(name = "myStep")
public Step step(
@Qualifier("myReader") MongoItemReader<TroublesomeModel> myReader,
@Qualifier("myWriter") ItemWriter<? super TroublesomeModel> myWriter
) {
return stepBuilderFactory.get("step")
.<TroublesomeModel, TroublesomeModel>chunk(100) // has to be 100
.reader(myReader)
.writer(myWriter)
.throttleLimit(10)
.build();
}
@Bean(name = "myJob")
public Job myJob(@Qualifier("myStep") Step step) {
return jobBuilderFactory
.get(JOB_NAME)
.start(step)
.build();
}
Is my current logic just not feasible to implement or is there something I'm missing?
I figured out what the issue was, and it's caused by the way ItemReader
fetches new data.
I logged every query on the database and came upon this line:
{ find: "message", filter: { processedAt: { $exists: false } },
skip: 8190, limit: 10, batchSize: 1000, noCursorTimeout: true, $db: ...
The interaction between skip
and filter
causes data to be skipped because once an item has been processed (and marked as such in the database) it is no longer returned by the filter
-query. That means that when skip
is called on the query, it is skipping over unprocessed data since the processed data was excluded from the query.
The conclusion from this is that it is not possible to update the data while it is being processed.
The workaround I used was to remove the query parameter for processedAt
and create a new ItemProcessor
that checks if the item has been processed before passing it on to ItemWriter
. The efficiency of this is questionable. If you expect the query to return mostly data that is valid for processing (i.e. without the processedAt
field), then it's a good solution. If you expect a lot of data is going to be discarded because it already has a processedAt
flag, then you might want to look into other solutions (and please do share them if you figure something out).
This is what the ItemProcessor
looks like, nothing fancy:
public class MyItemProcessor implements ItemProcessor<TroublesomeModel, TroublesomeModel> {
@Override
public TroublesomeModel process(TroublesomeModel item) throws Exception {
if (item.getProcessedAt() != null) {
return null;
}
return item;
}
}
And since I am using BulkOperations
in my ItemWriter
which cannot handle empty lists, there needs to be a null check before entering it as well.
An alternative workaround would be to just execute the query as-is, with the processedAt
flag, set a reader.maxItemCount
to a low enough number (in my case 1000), and re-run the job inside a scheduler on repeat. This fits my use case since it's supposed to actively process data as it arrives. No data is duplicated and it doesn't cause any unnecessary data processing or lengthy database queries.