Navigation

Mongo.watch()

Definition

Mongo.watch(pipeline, options)

New in version 4.0: Requires featureCompatibilityVersion (fCV) set to "4.0" or greater. For more information on fCV, see setFeatureCompatibilityVersion.

Opens a change stream cursor for a replica set or a sharded cluster to report on all its non-system collections across its databases, with the exception of the admin, local, and the config databases.

Parameter Type Description
pipeline array

A sequence of one or more of the following aggregation stages:

See Aggregation for complete documentation on the aggregation framework.

options document

Optional. Additional options that modify the behavior of Mongo.watch().

You must pass an empty array [] to the pipeline parameter if you are not specifying a pipeline but are passing the options document.

The options document can contain the following fields and values:

Field Type Description
resumeAfter document

Optional. Directs Mongo.watch() to attempt resuming notifications starting after the operation specified in the resume token.

Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.

resumeAfter is mutually exclusive with startAtOperationTime.

fullDocument string

Optional. By default, Mongo.watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.

Set fullDocument to "updateLookup" to direct Mongo.watch() to look up the most current majority-committed version of the updated document. Mongo.watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.

batchSize int

Optional. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.

Has the same functionality as cursor.batchSize().

maxAwaitTimeMS int

Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.

Defaults to 1000 milliseconds.

collation document Optional. Pass a collation document to specify a collation for the change stream cursor.
startAtOperationTime Timestamp

Optional. The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. To check the time range of the oplog, see rs.printReplicationInfo().

startAtOperationTime is mutually exclusive with resumeAfter.

Returns:A cursor over the change event documents. See Change Events for examples of change event documents.

Behavior

  • Mongo.watch() only notifies on data changes that have persisted to a majority of data-bearing members.
  • The change stream cursor remains open until one of the following occurs:
    • The cursor is explicitly closed.
    • An invalidate event occurs; for example, a collection drop or rename.
    • The connection to the MongoDB deployment is closed.
    • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
  • Mongo.watch() is available for replica sets and sharded clusters:
  • You can only use Mongo.watch() with the Wired Tiger storage engine.

Resumability

Unlike the MongoDB drivers, the mongo shell does not automatically attempt to resume a change stream cursor after an error. The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.

Mongo.watch() uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. If the operation identified by the resume token passed to the resumeAfter option has already dropped off the oplog, Mongo.watch() cannot resume the change stream.

See Resume a Change Stream for more information on resuming a change stream.

Note

  • You cannot resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream.
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.

If the featureCompatibilityVersion (fcv) is set to "4.0" or greater, newly opened change streams return a hex-encoded string for the resume token data, i.e. the _id._data value. This change allows for the ability to compare and sort the resume tokens. If the fcv is 3.6, newly opened change streams return a BinData for the resume token data.

Important

The fcv value at the time of the cursor’s opening determine the resume token data type. That is, the modification of the fcv does not affect the resume tokens for change streams already opened before the fcv change.

Regardless of the fcv value, a 4.0 replica set or a sharded cluster can resume a change stream using either the BinData or string resume token.

Full Document Lookup of Update Operations

By default, the change stream cursor returns specific field changes/deltas for update operations. You can also configure the change stream to look up and return the current majority-committed version of the changed document. Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.

Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.

Availability

Change stream is only available if "majority" read concern support is enabled (default).

Access Control

When running with access control, the user must have the find and changeStream privilege actions on all non-systems collections across all databases.. That is, a user must have a role that grants the following privilege:

{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

The built-in read role provides the appropriate privileges.

Example

The following operation in the mongo shell opens a change stream cursor on a replica set. The returned cursor reports on data changes to all the non-system collections across all databases except for the admin, local, and the config databases.

watchCursor = db.getMongo().watch()

Iterate the cursor to check for new events. Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

For complete documentation on change stream output, see Change Events.