Change Streams¶
On this page
New in version 3.6.
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.
Important
Change stream is only available if "majority"
read
concern support is enabled (default).
Open A Change Stream¶
You can only open a change stream against replica sets or sharded clusters. For a sharded
cluster, you must issue the open change stream operation against the
mongos
.
The replica set or the sharded cluster must use replica set protocol
version 1 (pv1
) and WiredTiger storage engine (can be encrypted).
The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:
- The cursor is explicitly closed.
- An invalidate event occurs.
- If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
- Python
- Java (Sync)
- Node.js
- PHP
- Motor
- Other
The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The following example uses stream to process the change events.
Alternatively, you can also use iterator to process the change events:
The examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The C examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
The examples below assume that you have connected to a MongoDB replica set and have accessed a database
that contains an inventory
collection.
To retrieve the data change event notifications, iterate the change
stream cursor
.
Note
The lifecycle of an unclosed cursor is language-dependent.
See Change Events for more information on the change stream response document format.
Modify Change Stream Output¶
- Python
- Java (Sync)
- Node.js
- PHP
- Motor
- Other
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
The pipeline
list includes a single $match
stage that
filters any operations where the username
is alice
, or
operations where the operationType
is delete
.
Passing the pipeline
to the watch()
method directs the
change stream to return notifications after passing them through the
specified pipeline
.
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
The following example uses stream to process the change events.
Alternatively, you can also use iterator to process the change events:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
See Change Events for more information on the change stream response document format.
Lookup Full Document for Update Operations¶
By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.
- Python
- Java (Sync)
- Node.js
- PHP
- Motor
- Other
To return the most current majority-committed version of the updated
document, pass full_document='updateLookup'
to the
db.collection.watch()
method.
In the example below, all update operations notifications
include a full_document
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated
document, pass FullDocument.UPDATE_LOOKUP
to the
db.collection.watch.fullDocument()
method.
In the example below, all update operations notifications
include a FullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated
document, pass { fullDocument: 'updateLookup' }
to the
collection.watch()
method.
In the example below, all update operations notifications
include a fullDocument
field that represents the current
version of the document affected by the update operation.
The following example uses stream to process the change events.
Alternatively, you can also use iterator to process the change events:
To return the most current
majority-committed version of the updated document, pass
"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
to the watch()
method.
In the example below, all update operations notifications
include a fullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated
document, pass full_document='updateLookup'
to the
db.collection.watch()
method.
In the example below, all update operations notifications
include a `full_document
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated
document, pass the "fullDocument"
option with the "updateLookup"
value to the
mongoc_collection_watch
method.
In the example below, all update operations notifications
include a fullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated
document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
to the
collection.Watch()
method.
In the example below, all update operations notifications
include a FullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated
document, pass full_document: 'updateLookup'
to the
watch()
method.
In the example below, all update operations notifications
include a full_document
field that represents the current
version of the document affected by the update operation.
Note
If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.
However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.
See Change Events for more information on the change stream response document format.
Resume a Change Stream¶
Change streams are resumable by specifying a resumeAfter
token when
opening the cursor. For the resumeAfter
token, use the _id
value of the change stream event document. Passing the _id
value to the change stream
attempts to resume notifications starting after the specified operation.
Important
- The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.
- You cannot resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream.
- Python
- Java (Sync)
- Node.js
- PHP
- Motor
- Other
In the example below, resume_token
contains the change
stream notification id. The resume_after
modifier takes a
parameter that must resolve to a resume token. Passing the
resume_token
to the resume_after
modifier directs the
change stream to attempt to resume notifications starting after
the operation specified in the resume token.
In the example below, the resumeToken
contains the change
stream notification id. The resumeAfter()
method takes a parameter that must resolve to a resume
token. Passing the resumeToken
to the resumeAfter()
method directs
the change stream to attempt to resume notifications starting after the
operation specified in the resume token.
In the example below, resumeToken
contains the change
stream notification id. The resumeAfter
takes a parameter
that must resolve to a resume token. Passing the
resumeToken
to the resumeAfter
modifier directs the
change stream to attempt to resume notifications starting after
the operation specified.
In the example below, $resumeToken
contains the change
stream notification id. The resumeAfter
option takes a
value that must resolve to a resume token. Passing the
$resumeToken
to the resumeAfter
option directs the
change stream to attempt to resume notifications starting after
the operation specified in the resume token.
In the example below, resume_token
contains the change
stream notification id. The resume_after
modifier takes a parameter that must resolve to a resume
token. Passing the resume_token
to the resume_after
modifier directs
the change stream to attempt to resume notifications starting after the
operation specified in the resume token.
In the example below, the resumeAfter
option is appended to the stream options
to recreate the stream after it has been destroyed. Passing the _id
to
the change stream attempts to resume notifications starting after the
operation specified.
In the example below, the resumeToken
is retrieved from the last change stream document
and passed to the Watch()
method as an option. Passing the resumeToken
to the Watch()
method directs
the change stream to attempt to resume notifications starting after the
operation specified in the resume token.
In the example below, resume_token
contains the change
stream notification id. The resume_after
modifier takes a
parameter that must resolve to a resume token. Passing the
resume_token
to the resume_after
modifier directs the
change stream to attempt to resume notifications starting after
the operation specified in the resume token.
Use Cases¶
Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.
Access Control¶
For deployments enforcing authentication and
authorization, authenticate as a user
with the changeStream
and find
privilege actions on the collection against which you want to
open a change stream.
The read
built-in role includes
the required privileges to support opening a change stream
against a collection. Any built-in role or
user-defined role that
inherits the read
role can also support opening
change streams against a collection.
Alternatively, use db.createRole
to create a user-defined
role that grants the changeStream
and find
privilege actions on the
target collection.
See User-Defined Roles for more complete documentation.
To associate a built-in role or user-defined role to an existing user,
use the db.grantRolesToUser()
or
db.updateUser()
methods. You can also specify
the role when creating a new user using db.createUser()
.
Event Notification¶
Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.
For example, consider a 3-member replica set with a change stream cursor opened against the primary. If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.