Search code examples
node.jsmongodbelasticsearchworkerchangestream

How can I implement a nodeJS worker that streams data from mongo to elasticsearch?


I'm building a CDC-based application that uses Mongo Change Streams to listen for change events and index the changes in elasticsearch in near real-time.

So far, I've implemented a worker that calls a function to capture events, transform them and index them in elasticsearch with no issues when implementing the stream for 1 mongo collection:

function syncChangeEvents() {
  const stream = ModelA.watch()
  while (!stream.isClosed()) {
    if (await stream.hasNext()) {
      const event = stream.next()
      // transform event
      // index to elasticsearch
    }
  }
}

I've implemented it using an infinite loop (probably a bad approach) but I'm not sure what alternatives there are when I have to keep the change stream alive forever.

The problem comes when I have to implement a change stream for another model. Since the first function has a while loop that is blocking, the worker can't call the second function to start the second change stream.

I'm wondering what the best way would be to spin up a worker that can trigger x no. of change streams without impacting the performance of each change stream. Would worker threads be the right way to go?


Solution

  • There are three primary ways to work with Change Streams in Node.js.

    1. You can monitor the Change Stream using EventEmitter's on() function.

       // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
       const changeStream = collection.watch(pipeline);
      
       // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
       // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
       // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
       changeStream.on('change', (next) => {
           console.log(next);
       });
      
       // Wait the given amount of time and then close the change stream
       await closeChangeStream(timeInMs, changeStream);
      
    2. You can monitor the Change Stream using hasNext().

       // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
       const changeStream = collection.watch(pipeline);
      
       // Set a timer that will close the change stream after the given amount of time
       // Function execution will continue because we are not using "await" here
       closeChangeStream(timeInMs, changeStream);
      
       // We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
       // If the change stream is closed, hasNext() will return false so the while loop will exit.
       // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
       while (await changeStream.hasNext()) {
           console.log(await changeStream.next());
       }
      
    3. You can monitor the Change Stream using the Stream API

       // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
       const changeStream = collection.watch(pipeline);
      
       // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
       changeStream.pipe(
           new stream.Writable({
               objectMode: true,
               write: function (doc, _, cb) {
                   console.log(doc);
                   cb();
               }
           })
       );
      
       // Wait the given amount of time and then close the change stream
       await closeChangeStream(timeInMs, changeStream);
      

    If your MongoDB database is hosted on Atlas (https://cloud.mongodb.com), the simplest thing to do is create a Trigger. Atlas handles programming the Change Stream code for you, so you only have to write the code that will transform the event and index them in Elasticsearch.

    More information on working with Change Streams and Triggers is available in my blog post. A complete code example for all of the snippets above is available on GitHub.