Navigation

Change Streams

New in version 3.6.

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.

Open A Change Stream

You can only open a change stream against replica sets or sharded clusters. For a sharded cluster, you must issue the open change stream operation against the mongos.

The replica set or the sharded cluster must use replica set protocol version 1 (pv1) and WiredTiger storage engine (can be encrypted).

The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The following example opens a change stream against a replica set. The change stream is bound to a collection and change stream documents are iterated with a cursor. This cursor remains open until it is explicitly closed, as long as a connection to the MongoDB deployment remains open and the collection exists.

cursor = db.inventory.watch()
document = next(cursor)
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();
  const changeStream = collection.watch();
  changeStream.next(function(err, next) {
    if (err) return console.log(err);
    expect(err).to.equal(null);
    expect(next).to.exist;

    // Since changeStream has an implicit seession,
    // we need to close the changeStream for unit testing purposes
    changeStream.close();
    client.close();
    done();
  });
$changeStream = $db->inventory->watch();
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$secondChange = $changeStream->current();
cursor = db.inventory.watch()
document = await cursor.next()
 mongoc_collection_t *collection;
 bson_t pipeline = BSON_INITIALIZER;
 bson_t opts = BSON_INITIALIZER;
 mongoc_change_stream_t *stream;
 const bson_t *change;
 bson_iter_t iter;
 bson_error_t error;

 collection = mongoc_database_get_collection (db, "inventory");
 stream = mongoc_collection_watch (collection, &pipeline, NULL /* opts */);
 mongoc_change_stream_next (stream, &change);
 if (mongoc_change_stream_error_document (stream, &error, NULL)) {
    MONGOC_ERROR ("%s\n", error.message);
 }

 mongoc_change_stream_destroy (stream);
var enumerator = inventory.Watch().ToEnumerable().GetEnumerator();
enumerator.MoveNext();
var next = enumerator.Current;
enumerator.Dispose();
cursor = inventory.watch.to_enum
next_change = cursor.next

To retrieve the data change event notifications, iterate the change stream cursor.

Note

The lifecycle of an unclosed cursor is language-dependent.

See Change Events for more information on the change stream response document format.

Modify Change Stream Output

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
    Document.parse("{'fullDocument.username': 'alice'}"),
    Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters any operations where the username is alice, or operations where the operationType is delete.

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];

const changeStream = collection.watch(pipeline);
changeStream.next(function(err, next) {
  expect(err).to.not.exist;
  expect(next).to.exist;
  expect(next.fullDocument.username).to.equal('alice');
  expect(next.newField).to.exist;
  expect(next.newField).to.equal('this is an added field!');

  // Since changeStream has an implicit seession,
  // we need to close the changeStream for unit testing purposes
  changeStream.close();
  client.close();
  done();
});

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

See Change Events for more information on the change stream response document format.

Lookup Full Document for Update Operations

By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

In the example below, all update operations notifications include a `full_document field that represents the current version of the document affected by the update operation.

cursor = db.inventory.watch(full_document='updateLookup')
document = next(cursor)

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' } to the collection.watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

const changeStream = collection.watch({ fullDocument: 'updateLookup' });
changeStream.on('change', function(change) {
  expect(change).to.exist;

  // Since changeStream has an implicit seession,
  // we need to close the changeStream for unit testing purposes
  changeStream.close();
  client.close();
  done();
});

To return the most current majority-committed version of the updated document, pass "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" to the watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$nextChange = $changeStream->current();

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

In the example below, all update operations notifications include a `full_document field that represents the current version of the document affected by the update operation.

cursor = db.inventory.watch(full_document='updateLookup')
document = await cursor.next()

To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

 BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
 stream = mongoc_collection_watch (collection, &pipeline, &opts);
 mongoc_change_stream_next (stream, &change);
 if (mongoc_change_stream_error_document (stream, &error, NULL)) {
    MONGOC_ERROR ("%s\n", error.message);
 }

 mongoc_change_stream_destroy (stream);

To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to the collection.Watch() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var enumerator = inventory.Watch(options).ToEnumerable().GetEnumerator();
enumerator.MoveNext();
var next = enumerator.Current;
enumerator.Dispose();

To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup' to the watch() method.

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

Note

If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.

However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.

See Change Events for more information on the change stream response document format.

Resume a Change Stream

Change streams are resumable, as long as the oplog has enough history to locate the last operation that the application received.

Each change stream response document has an _id field that contains a document containing a resume token. Passing the _id to the change stream attempts to resume notifications starting at the operation specified.

In the example below, resume_token contains the change stream notification id. The resume_after modifier takes a parameter that must resolve to a resume token. Passing the resume_token to the resume_after modifier directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

resume_token = document.get("_id")
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

In the example below, the resumeToken contains the change stream notification id. The resumeAfter() method takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter() method directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

In the example below, resumeToken contains the change stream notification id. The resumeAfter takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter modifier directs the change stream to attempt to resume notifications starting at the operation specified.

let resumeToken;

const changeStream = collection.watch();
changeStream.hasNext(function(err, change) {
  if (err) return console.log(err);
  expect(err).to.equal(null);
  expect(change).to.exist;
  changeStream.next(function(err, change) {
    if (err) return console.log(err);
    expect(err).to.equal(null);

    resumeToken = change._id;

    expect(change._id).to.exist;
    expect(changeStream.resumeToken).to.exist;

    changeStream.close(function(err) {
      if (err) return console.log(err);
      expect(err).to.equal(null);
      const newChangeStream = collection.watch({ resumeAfter: resumeToken });

      newChangeStream.next(function(err, next) {
        if (err) return console.log(err);
        expect(err).to.equal(null);
        expect(next).to.exist;

        // Since changeStream has an implicit seession,
        // we need to close the changeStream for unit testing purposes
        newChangeStream.close();
        client.close();
        done();
      });
    });
  });
});

In the example below, $resumeToken contains the change stream notification id. The resumeAfter option takes a value that must resolve to a resume token. Passing the $resumeToken to the resumeAfter option directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

$resumeToken = ($lastChange !== null) ? $lastChange->_id : null;

if ($resumeToken === null) {
    throw new \Exception('resumeToken was not found');
}

$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();

$nextChange = $changeStream->current();

In the example below, resume_token contains the change stream notification id. The resume_after modifier takes a parameter that must resolve to a resume token. Passing the resume_token to the resume_after modifier directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

resume_token = document.get("_id")
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting at the operation specified.

 stream = mongoc_collection_watch (collection, &pipeline, NULL);
 if (mongoc_change_stream_next (stream, &change)) {
    bson_iter_init_find (&iter, change, "_id");
    BSON_APPEND_VALUE (&opts, "resumeAfter", bson_iter_value (&iter));

    mongoc_change_stream_destroy (stream);
    stream = mongoc_collection_watch (collection, &pipeline, &opts);
    mongoc_change_stream_next (stream, &change);
    mongoc_change_stream_destroy (stream);
 } else {
    if (mongoc_change_stream_error_document (stream, &error, NULL)) {
       MONGOC_ERROR ("%s\n", error.message);
    }

    mongoc_change_stream_destroy (stream);
 }

In the example below, the resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. Passing the resumeToken to the Watch() method directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

  var resumeToken = lastChangeStreamDocument.ResumeToken;
  var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
  var enumerator = inventory.Watch(options).ToEnumerable().GetEnumerator();
  enumerator.MoveNext();
  var next = enumerator.Current;
  enumerator.Dispose();

In the example below, resume_token contains the change stream notification id. The resume_after modifier takes a parameter that must resolve to a resume token. Passing the resume_token to the resume_after modifier directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

resume_token = next_change['_id']
cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = cursor.next

As long as that operation has not rolled off the oplog, the change stream can successfully resume notifications.

Note

Invalidated change streams cannot be resumed. Attempting to resume a change stream against a dropped or renamed collection results in an error.

Use Cases

Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.

Access Control

For deployments enforcing Authentication and authorization, applications can only open change streams against collections they have read access to.

Event Notification

Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.

For example, consider a 3-member replica set with a change stream cursor opened against the primary. If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.