Search code examples
javamongodbspring-bootaggregation

Using $merge in Document-based aggregation pipeline is not working


I have a collection, which I would like to perform an aggregation on and put the result into a separate collection in the same database. While scrutinizing the docs, I stumbled upon $merge which works exactly the way I want. I came up with the following mongo shell pipeline, which works flawlessly.

db.getCollection('SOURCE_COLLECTION').aggregate([
  {
    "$match": {type: 'ABC'}
  },
  {
    "$merge": {
      "into": "OUTPUT_COLLECTION",
      "whenMatched": "replace"
    }
  }
])

Now, I need the same effect in Spring boot, for which I have come up with the following, which theoretically should be no different.

final ArrayList<Document> pipeline = new ArrayList<>();

pipeline.add(Document.parse("{$match: {type: 'ABC'}}"));
pipeline.add(Document.parse("{$merge: {into: 'OUTPUT_COLLECTION', whenMatched: 'replace'}}"));

mongoTemplate.getDb()
    .getCollection("SOURCE_COLLECTTION", Document.class)
    .aggregate(pipeline);

Nonetheless, this ain't working. As it can bee seen, I am using MongoCollection<T>.aggregate() method, which takes a List<Document> as a pipeline. Each stage in the pipeline is generated by parsing a JSON string into a document.

Interestingly, when I replace merging with $out, it works without any issue.

final ArrayList<Document> pipeline = new ArrayList<>();

pipeline.add(Document.parse("{$match: {type: 'ABC'}}"));
pipeline.add(Document.parse("{$out: 'OUTPUT_COLLECTION'}"));

mongoTemplate.getDb()
    .getCollection("SOURCE_COLLECTTION", Document.class)
    .aggregate(pipeline);

But this is no good to me, as this aggregation will be executed multiple times (actually I am trying to populate a materialised view kind of a collection here). I'd need $merge to work, but it doesn't. What am I missing? Can someone see something that I haven't?


Solution

  • It looks like the method aggregate(pipeline) does not behave like a terminal operation, when $merge is used in the pipeline. For the merger to work, some operation like void toCollection() or void forEach(Consumer<? extends T> consumer) should be invoked. So, invoking toCollection() on return from aggregate() did the trick.

    final ArrayList<Document> pipeline = new ArrayList<>();
    
    pipeline.add(Document.parse("{$match: {type: 'ABC'}}"));
    pipeline.add(Document.parse("{$out: 'OUTPUT_COLLECTION'}"));
    
    mongoTemplate.getDb()
        .getCollection("SOURCE_COLLECTTION", Document.class)
        .aggregate(pipeline)
        .toCollection(); // This invocation completes the pipeline when it ends with $merge.
    

    Interestingly, nothing of this sort is mentioned in documents it seems (at least not in any that I could find). I am not sure if this is the expected behaviour even.