Change Streams¶
On this page
As of version 3.6 of the MongoDB server, a new $changeStream
pipeline stage
is supported in the aggregation framework. Specifying this stage first in an
aggregation pipeline allows users to request that notifications are sent for all
changes to a particular collection. As of MongoDB 4.0, change streams are
supported on databases and clusters in addition to collections.
The Ruby driver provides an API for receiving notifications for changes to a particular collection, database or cluster using this new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework directly, it is recommended to use the driver API described below as the driver resumes the change stream one time if there is a timeout, a network error, a server error indicating that a failover is taking place or another type of a resumable error.
Change streams on the server require a "majority"
read concern or no
read concern.
Change streams do not work properly with JRuby because of the issue documented here.
Namely, JRuby eagerly evaluates #next
on an Enumerator in a background
green thread, therefore calling #next
on the change stream will cause
getMores to be called in a loop in the background.
Watching for Changes on a Collection¶
A collection change stream is created by calling the #watch
method on a
collection:
You can also receive the notifications as they become available:
The next
method blocks and polls the cluster until a change is available.
Use the try_next
method to iterate a change stream without blocking; this
method will wait up to max_await_time_ms milliseconds for changes from the server,
and if no changes are received it will return nil. If there is a non-resumable
error, both next
and try_next
will raise an exception.
See Resuming a Change Stream section below for an example that reads
changes from a collection indefinitely.
The change stream can take filters in the aggregation framework pipeline operator format:
Watching for Changes on a Database¶
A database change stream notifies on changes on any collection within the database as well as database-wide events, such as the database being dropped.
A database change stream is created by calling the #watch
method on a
database object:
Watching for Changes on a Cluster¶
A cluster change stream notifies on changes on any collection, any database within the cluster as well as cluster-wide events.
A cluster change stream is created by calling the #watch
method on a
client object (not the cluster object):
Closing a Change Stream¶
You can close a change stream by calling its #close
method:
Resuming a Change Stream¶
The driver will automatically retry getMore operations on a change stream once. Initial aggregation is never retried. In practical terms this means that, for example:
- Calling
collection.watch
will fail if the cluster does not have enough available nodes to satisfy the"majority"
read preference. - Once
collection.watch
successfully returns, if the cluster subsequently experiences an election or loses a node, but heals quickly enough, change stream reads vianext
oreach
methods will continue transparently to the application.
If the cluster loses enough nodes to not be able to satisfy the "majority"
read preference and does not heal quickly enough, next
and each
will raise an error. In these cases the application must track, via the
resume token, which documents from the change stream it has processed and
create a new change stream object via the watch
call, passing an
appropriate :resume_after
argument. The _id
key in each change
document can be used as a resume token. However, to get the most up-to-date
resume token, use the resume_token
method.
To watch a collection indefinitely, retrying on all MongoDB errors, using next
: