Navigation

db.collection.watch()

Definition

db.collection.watch(pipeline, options)

Opens a change stream cursor on the collection.

Parameter Type Description
pipeline array

A sequence of one or more of the following aggregation stages:

See Aggregation for complete documentation on the aggregation framework.

options document

Optional. Additional options that modify the behavior of watch().

You must pass an empty array [] to the pipeline parameter if you are not specifying a pipeline but are passing the options document.

The options document can contain the following fields and values:

Field Type Description
resumeAfter document

Optional. Directs watch to attempt resuming notifications starting after the operation specified in the resume token.

Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.

fullDocument string

Optional. By default, watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.

Set fullDocument to "updateLookup" to direct watch() to look up the most current majority-committed version of the updated document. watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.

batchSize int

Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Has the same functionality as cursor.batchSize().

maxAwaitTimeMS int

Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000 milliseconds.

collation document Optional. Pass a collation document to specify a collation for the change stream cursor.
Returns:A cursor that remains open as long as a connection to the MongoDB deployment remains open and the collection exists. See Change Events for examples of change event documents.

Behavior

watch() requires that the target MongoDB deployment is either a replica set or a sharded cluster with replica set shards. You cannot use change streams on a standalone MongoDB deployment.

You can only use watch() on replica sets or sharded clusters where each data bearing mongod runs with the Wired Tiger storage engine.

watch() only notifies on data changes that have persisted to a majority of data-bearing members in the replica set or shard replica set.

Resumability

The mongo shell does not automatically attempt to resume a change stream cursor after an error as the MongoDB 3.6-series drivers do. The MongoDB 3.6-series drivers make one attempt to automatically resume a change stream cursor after certain errors.

If the operation identified by the resume token passed to the resumeAfter option has already dropped off the cluster’s oplog, watch() cannot resume the change stream.

watch() uses information stored in the oplog related to a given data changing operation to produce the change event description and generate a resume token associated to that operation. If enough time has passed such that the data changing operation which generated the resume token is no longer present in the oplog, you cannot use that token to resume the change stream.

Attempting to resume a change stream against a dropped collection results in an error.

See Resume a Change Stream for complete documentation on resuming a change stream.

Full Document Lookup of Update Operations

The result of a full document lookup may differ from the delta of fields changed by the update operation if other majority-committed operations modified the document between the original update operation and the full document lookup.

Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.

Examples

Open a Change Stream

The following operation opens a change stream cursor against the data.sensors collection:

watchCursor = db.getSiblingDB("data").sensors.watch()

Iterate the cursor to check for new events. Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      watchCursor.next();
   }
}

For complete documentation on change stream output, see Change Events.

Change Stream with Full Document Update Lookup

Set the fullDocument option to "updateLookup" to direct the change stream cursor to lookup the most current majority-committed version of the document associated to an update change stream event.

The following operation opens a change stream cursor against the data.sensors collection using the fullDocument : "updateLookup" option.

watchCursor = db.getSiblingDB("data").sensors.watch(
   [],
   { fullDocument : "updateLookup" }
)

Iterate the cursor to check for new events. Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      watchCursor.next();
   }
}

For any update operation, the change event returns the result of the document lookup in the fullDocument field.

For an example of the full document update output, see change stream update event.

For complete documentation on change stream output, see Change Events.

Change Stream with Aggregation Pipeline Filter

The following operation opens a change stream cursor against the data.sensors collection using an aggregation pipeline to filter only insert events:

watchCursor = db.getSiblingDB("data").sensors.watch(
   [
      { $match : {"operationType" : "insert" } }
   ]
)

Iterate the cursor to check for new events. Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      watchCursor.next();
   }
}

The change stream cursor only returns change events where the operationType is insert. For complete documentation on change stream output, see Change Events.

Resuming a Change Stream

Every document returned by a change stream cursor includes a resume token as the _id field. To resume a change stream, pass the entire _id document of the change event you want to resume from to the resumeAfter option of watch().

The following operation resumes a change stream cursor against the data.sensors collection using a resume token. This assumes that the operation that generated the resume token has not rolled off the cluster’s oplog.

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;

while (!watchCursor.isExhausted()) {
   if (watchCursor.hasNext()) {
     firstChange = watchCursor.next();
     break;
   }
}

watchCursor.close();

let resumeToken = firstChange._id;

resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
   { resumeAfter : resumeToken }
)

Iterate the cursor to check for new events. Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:

while (!resumedWatchCursor.isExhausted()){
   if (resumedWatchCursor.hasNext()){
      resumedWatchCursor.next();
   }
}

See Resume a Change Stream for complete documentation on resuming a change stream.