Navigation

Watch for Changes

You can watch for changes to a single collection, a database, or an entire deployment in MongoDB with Change Streams. Open a change stream by calling the watch() method on a Collection, Db, or MongoClient object. The change stream emits change event documents when they occur.

The watch() method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter. The aggregation stages filter and transform the change events.

In the example below, the $match stage will match all change event documents with a runtime value of less than 15, filtering all others out.

const pipeline = [ { $match: { runtime: { $lt: 15 } } ];
const changeStream = collection.watch(pipeline);

The watch() method accepts an options object as the second parameter. Refer to the links at the end of this section for more information on the settings you can configure with this object.

The watch() method returns an instance of a ChangeStream. You can read events from change streams by iterating over them or listening for events. Select the tab that corresponds to the way you want to read events from the change stream below.

Warning IconCreated with Sketch.Warning

Avoid mixing both the iterative and event-based approach of reading from change streams as it can cause unexpected behavior.

You can call methods on the ChangeStream object such as:

  • hasNext() to check for remaining documents in the stream
  • next() to request the next document in the stream
  • close() to close the ChangeStream

Visit the following resources for additional material on the classes and methods presented above:

Info With Circle IconCreated with Sketch.Note

Change events that contain information on update operations only return the modified fields by default rather than the full updated document. You can configure your change stream to also return the most current version of the document by setting the fullDocument field of the options object to "updateLookup" as follows:

const changeStream = collection.watch();
const options = { fullDocument: "updateLookup" };
changeStream.on("change", callbackFunction, options);

The following example opens a change stream on the movies collection in the sample_mflix database. Let's create a listener function to receive and print change events that occur on the collection.

First, open the change stream on the collection and then define a callback on the change stream using the on() method. Once set, generate a change event to be emitted by performing a change to the collection.

To generate the change event on the collection, let's use insertOne() method to add a new document. Since the insertOne() may run before the listener function can register, we use a timer, declared with setTimeout() to wait one second before executing the insert.

We also use a second timer to wait an additional second after the insertion of the document to provide ample time for the change event to be received and the for the callback to complete its execution before closing the ChangeStream instance using the close() method.

The timers used in this example are only necessary for this demonstration to make sure there is enough time to register listener and have the callback process the event before exiting.

Info With Circle IconCreated with Sketch.Note

This example connects to an instance of MongoDB using a connection string. To learn more about connecting to your MongoDB instance, see the connection guide.

const { MongoClient } = require("mongodb");
// Replace the uri string with your MongoDB deployment's connection string.
const uri =
"mongodb+srv://<user>:<password>@<cluster-url>?writeConcern=majority";
const client = new MongoClient(uri);
let changeStream;
async function run() {
try {
await client.connect();
const database = client.db("sample_mflix");
const movies = database.collection("movies");
// open a Change Stream on the "movies" collection
changeStream = movies.watch();
// set up a listener when change events are emitted
changeStream.on("change", next => {
// process any change event
console.log("received a change to the collection: \t", next);
});
// use a timeout to ensure the listener is registered before the insertOne
// operation is called.
await new Promise(resolve => {
setTimeout(async () => {
await movies.insertOne({
test: "sample movie document",
});
// wait to close `changeStream` after the listener receives the event
setTimeout(async () => {
resolve(await changeStream.close());
}, 1000);
}, 1000);
});
} finally {
await client.close();
}
}
run().catch(console.dir);

If you run the example above, you should see output similar to the following:

received a change to the collection: {
_id: {
_data: '825EC...'
},
operationType: 'insert',
clusterTime: Timestamp { ... },
fullDocument: { _id: 5ec3..., test: 'sample movie document' },
ns: { db: 'sample_mflix', coll: 'movies' },
documentKey: { _id: 5ec3... },
}
Give Feedback