Navigation

Change Stream Examples

New in version 3.6.

Important

Change streams are available for replica sets or sharded clusters with replica set shards. You cannot open a change stream against a standalone mongod. For a sharded cluster, you must issue the open change stream operation against the mongos.

The replica set or the sharded cluster must use:

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

Open A Change Stream

This example opens a change stream against a replica set. The change stream is bound to a collection and change stream documents are iterated with a cursor. This cursor remains open until it is explicitly closed, as long as a connection to the MongoDB deployment remains open and the collection exists.

    cursor = db.inventory.watch()
    document = next(cursor)
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();
   mongoc_collection_t *collection;
   bson_t pipeline = BSON_INITIALIZER;
   bson_t opts = BSON_INITIALIZER;
   mongoc_change_stream_t *stream;
   const bson_t *change;
   bson_iter_t iter;
   bson_error_t error;

   collection = mongoc_database_get_collection (db, "inventory");
   stream = mongoc_collection_watch (collection, &pipeline, NULL /* opts */);
   mongoc_change_stream_next (stream, &change);
   if (mongoc_change_stream_error_document (stream, &error, NULL)) {
      MONGOC_ERROR ("%s\n", error.message);
   }

   mongoc_change_stream_destroy (stream);

In order to retrieve the data change event notifications, iterate the change stream cursor.

Note

The lifecycle of an unclosed cursor is language-dependent.

See Change Events for more information on the change stream response document format.

Lookup Full Document for Update Operations

By default, change streams only return the delta of fields during the update operation. To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

By default, change streams only return the delta of fields during the update operation. To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.

By default, change streams only return the delta of fields during the update operation. To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method.

If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.

However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.

Iterate the change stream cursor to retrieve the change stream documents in order. In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

    cursor = db.inventory.watch(full_document='updateLookup')
    document = next(cursor)
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();
   BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
   stream = mongoc_collection_watch (collection, &pipeline, &opts);
   mongoc_change_stream_next (stream, &change);
   if (mongoc_change_stream_error_document (stream, &error, NULL)) {
      MONGOC_ERROR ("%s\n", error.message);
   }

   mongoc_change_stream_destroy (stream);

See Change Events for more information on the change stream response document format.

Modify Change Stream Output using Aggregation Pipelines

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = database.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
    Document.parse("{'fullDocument.username': 'alice'}"),
    Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters any operations where the username is alice, or operations where the operationType is delete.

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

See Change Events for more information on the change stream response document format.

Resume a Change Stream

Each change stream response document has an _id field that contains a document containing a resume token. The resumeToken includes the change stream notification id. You can use the resumeToken document to resume notification after a specific notification.

In the example below, the resumeAfter modifier takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter modifier directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

    resume_token = document.get("_id")
    cursor = db.inventory.watch(resume_after=resume_token)
    document = next(cursor)

In the example below, the resumeAfter() method takes a parameter that must resolve to a resume token. Passing the resumeToken to the resumeAfter() method directs the change stream to attempt to resume notifications starting at the operation specified in the resume token.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the ``_id` to the change stream attempts to resume notifications starting at the operation specified.

   stream = mongoc_collection_watch (collection, &pipeline, NULL);
   if (mongoc_change_stream_next (stream, &change)) {
      bson_iter_init_find (&iter, change, "_id");
      BSON_APPEND_VALUE (&opts, "resumeAfter", bson_iter_value (&iter));

      mongoc_change_stream_destroy (stream);
      stream = mongoc_collection_watch (collection, &pipeline, &opts);
      mongoc_change_stream_next (stream, &change);
      mongoc_change_stream_destroy (stream);
   } else {
      if (mongoc_change_stream_error_document (stream, &error, NULL)) {
         MONGOC_ERROR ("%s\n", error.message);
      }

      mongoc_change_stream_destroy (stream);
   }

As long as that operation has not rolled off the oplog, the change stream can successfully resume notifications.

Note

Invalidated change streams cannot be resumed. Attempting to resume a change stream against a dropped or renamed collection results in an error.

See Change Events for more information on the change stream response document format.