Navigation

Change Streams

As of version 3.6 of the MongoDB server, a new $changeStream pipeline stage is supported in the aggregation framework. Specifying this stage first in an aggregation pipeline allows users to request that notifications are sent for all changes to a particular collection. As of MongoDB 4.0, change streams are supported on databases and clusters in addition to collections.

The Ruby driver provides an API for receiving notifications for changes to a particular collection, database or cluster using this new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework directly, it is recommended to use the driver API described below as the driver resumes the change stream one time if there is a timeout, a network error, a server error indicating that a failover is taking place or another type of a resumable error.

Change streams on the server require a "majority" read concern or no read concern.

Change streams do not work properly with JRuby because of the issue documented here. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.

Watching for Changes on a Collection

A collection change stream is created by calling the #watch method on a collection:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

You can also receive the notifications as they become available:

stream = collection.watch
enum = stream.to_enum
while doc = enum.next
  process(doc)
end

The next method blocks and polls the cluster until a change is available. If there is a non-resumable error, next will raise an exception. See Resuming a Change Stream section below for an example that reads changes from a collection indefinitely.

The change stream can take filters in the aggregation framework pipeline operator format:

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
                           {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
                          ])
enum = stream.to_enum
while doc = enum.next
  process(doc)
end

Watching for Changes on a Database

A database change stream notifies on changes on any collection within the database as well as database-wide events, such as the database being dropped.

A database change stream is created by calling the #watch method on a database object:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Watching for Changes on a Cluster

A cluster change stream notifies on changes on any collection, any database within the cluster as well as cluster-wide events.

A cluster change stream is created by calling the #watch method on a client object (not the cluster object):

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Closing a Change Stream

You can close a change stream by calling its #close method:

stream.close

Resuming a Change Stream

The driver will automatically retry getMore operations on a change stream once. Initial aggregation is never retried. In practical terms this means that, for example:

  • Calling collection.watch will fail if the cluster does not have enough available nodes to satisfy the "majority" read preference.
  • Once collection.watch successfully returns, if the cluster subsequently experiences an election or loses a node, but heals quickly enough, change stream reads via next or each methods will continue transparently to the application.

If the cluster loses enough nodes to not be able to satisfy the "majority" read preference and does not heal quickly enough, next and each will raise an error. In these cases the application must track, via the resume token, which documents from the change stream it has processed and create a new change stream object via the watch call, passing an appropriate :resume_after argument. The resume token is exposed at the key _id in each change document.

token = doc['_id']
stream = collection.watch([], resume_after: token)

To watch a collection indefinitely, retrying on all MongoDB errors:

token = nil
while true
  begin
    stream = collection.watch([], resume_after: token)
    enum = stream.to_enum
    while doc = enum.next
      process(doc)
      token = doc['_id']
    end
  rescue Mongo::Error
    sleep 1
  end
end
←   Transactions Databases  →