Search code examples
javamongodbspring-bootwatchchangestream

How do I iterate over a MongoDB Change Stream in Spring Boot?


I have read countless of articles and code examples on MongoDB Change Streams, but I still can't manage to set it up properly. I'm trying to listen to a specific collection in my MongoDB and whenever a document is inserted, updated or deleted, I want to do something.

This is what I've tried:

@Data
@Document(collection = "teams")
public class Teams{
    private @MongoId(FieldType.OBJECT_ID)
    ObjectId id;
    private Integer teamId;
    private String name;
    private String description;
}

import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;

import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.ChangeStreamIterable;

import org.bson.Document;
import org.bson.conversions.Bson;

import java.util.Arrays;
import java.util.List;

public class MongoDBChangeStream {

    // connect to the local database server
    MongoClient mongoClient = MongoClients.create("db uri goes here");

    // Select the MongoDB database
    MongoDatabase database = mongoClient.getDatabase("MyDatabase");

    // Select the collection to query
    MongoCollection<Document> collection = database.getCollection("teams");

    // Create pipeline for operationType filter
    List<Bson> pipeline = Arrays.asList(
            Aggregates.match(
                    Filters.in("operationType",
                            Arrays.asList("insert", "update", "delete"))));

    // Create the Change Stream
    ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
            .fullDocument(FullDocument.UPDATE_LOOKUP);

    // Iterate over the Change Stream
    for (Document changeEvent : changeStream) {
        // Process the change event here
    }
}

So this is what I have so far and everything is good until the for-loop which gives three errors:

  1. There is a red line under 'for (', which says unexpected token.
  2. There is a red line under ' :', which says ';' expected.
  3. There is a red line under 'changeStream)', which says unknown class: 'changeStream'.

Solution

  • First of all you should put your code inside class method, not class body. Second - ChangeStreamIterable<Document> iterator element is ChangeStreamDocument<Document> and not Document.

    Summing things up:

    public class MongoDBChangeStream {
    
        public void someMethod() {
    
            // connect to the local database server
            MongoClient mongoClient = MongoClients.create("db uri goes here");
    
            // Select the MongoDB database
            MongoDatabase database = mongoClient.getDatabase("MyDatabase");
    
            // Select the collection to query
            MongoCollection<Document> collection = database.getCollection("teams");
    
            // Create pipeline for operationType filter
            List<Bson> pipeline = Arrays.asList(
                    Aggregates.match(
                            Filters.in(
                                    "operationType",
                                    Arrays.asList("insert", "update", "delete")
                            )));
    
            // Create the Change Stream
            ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
                    .fullDocument(FullDocument.UPDATE_LOOKUP);
    
            // Iterate over the Change Stream
            for (ChangeStreamDocument<Document> changeEvent : changeStream) {
                // Process the change event here
            }
        }
    }