Search code examples
javamongodbspring-bootwatchchangestream

How do I actually start a MongoDB Change Stream in Spring Boot?


So I've managed to build up a very basic MongoDB Change Stream in my Spring Boot application:

public class MongoDBChangeStream {

    public void changeStream() {

        // connect to the local database server
        MongoClient mongoClient = MongoClients.create("db uri goes here");

        // Select the MongoDB database
        MongoDatabase database = mongoClient.getDatabase("MyDatabase");

        // Select the collection to query
        MongoCollection<Document> collection = database.getCollection("teams");

        // Create pipeline for operationType filter
        List<Bson> pipeline = Arrays.asList(
                Aggregates.match(
                        Filters.in(
                                "operationType",
                                Arrays.asList("insert", "update", "delete")
                        )));

        // Create the Change Stream
        ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
                .fullDocument(FullDocument.UPDATE_LOOKUP);

        // Iterate over the Change Stream
        for (ChangeStreamDocument<Document> changeEvent : changeStream) {
            // Process the change event here
            switch (changeEvent.getOperationType()) {
                case INSERT:
                    System.out.println("MongoDB Change Stream detected an insert");
                    break;
                case UPDATE:
                    System.out.println("MongoDB Change Stream detected an update");
                    break;
                case DELETE:
                    System.out.println("MongoDB Change Stream detected a delete");
                    break;
            }
        }
    }
}

As you can see I'm listening for inserts, updates and deletes in the 'teams' collection, and then just print out the appropriate message in the console.

But how do I actually start the listener? When I run the application, something needs to start the changeStream for it to start watching the collection.. How do I do that?


Solution

  • If I understand correctly, you be able to simply add a @Configuration class with a @PostConstruct method:

    @Configuration
    public class DatabaseChangeStreamInitialiser {
    
        @PostConstruct
        public void init() {
          // eg, call `MongoDBChangeStream.changeStream`
        }
    }
    

    or

    @Component
    public class DatabaseChangeStreamInitialiser {
    
        @EventListener(ContextRefreshedEvent.class)
        public void init() {
          // eg, call `MongoDBChangeStream.changeStream`
        }
    }
    

    There are many ways to do this, you can @EventListener annotations to listen for ContextRefreshedEvent. Create InitializingBeans, or even just finding the main method in your class annotated with @SpringBootApplication.

    Some other options are documented here: Execute method on startup in Spring

    Hopefully it is just that. If not, if you could describe the exact startup process and details of your spring-boot app, that would help.