Navigation

Change Streams

As of version 3.6 of the MongoDB server, a new $changeStream pipeline stage is supported in the aggregation framework. Specifying this stage first in an aggregation pipeline allows users to request that notifications are sent for all changes to a particular collection. As of MongoDB 4.0, change streams are supported on databases and clusters in addition to collections.

The Ruby driver provides an API for receiving notifications for changes to a particular collection, database or cluster using this new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework directly, it is recommended to use the driver API described below as the driver resumes the change stream one time if there is a timeout, a network error, a server error indicating that a failover is taking place or another type of a resumable error.

Change streams on the server require a "majority" read concern or no read concern.

Change streams do not work properly with JRuby because of the issue documented here. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.

Watching for Changes on a Collection

A collection change stream is created by calling the #watch method on a collection:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

You can also receive the notifications as they become available:

stream = collection.watch
enum = stream.to_enum
while doc = enum.next
  process(doc)
end

The next method blocks and polls the cluster until a change is available. Use the try_next method to iterate a change stream without blocking; this method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil. If there is a non-resumable error, both next and try_next will raise an exception. See Resuming a Change Stream section below for an example that reads changes from a collection indefinitely.

The change stream can take filters in the aggregation framework pipeline operator format:

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
                           {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
                          ])
enum = stream.to_enum
while doc = enum.next
  process(doc)
end

Watching for Changes on a Database

A database change stream notifies on changes on any collection within the database as well as database-wide events, such as the database being dropped.

A database change stream is created by calling the #watch method on a database object:

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Watching for Changes on a Cluster

A cluster change stream notifies on changes on any collection, any database within the cluster as well as cluster-wide events.

A cluster change stream is created by calling the #watch method on a client object (not the cluster object):

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

Closing a Change Stream

You can close a change stream by calling its #close method:

stream.close

Resuming a Change Stream

A change stream consists of two types of operations: the initial aggregation and getMore requests to receive the next batch of changes.

The driver will automatically retry each getMore operation once on network errors and when the server returns an error indicating it changed state (for example, it is no longer the primary). The driver does not retry the initial aggregation.

In practical terms this means that, for example:

  • Calling collection.watch will fail if the cluster does not have enough available nodes to satisfy the "majority" read preference.
  • Once collection.watch successfully returns, if the cluster subsequently experiences an election or loses a node, but heals quickly enough, change stream reads via next or each methods will continue transparently to the application.

To indefinitely and reliably watch for changes without losing any changes or processing a change more than once, the application must track the resume token for the change stream and restart the change stream when it experiences extended error conditions that cause the driver’s automatic resume to also fail. The following code snippet shows an example of iterating a change stream indefinitely, retrieving the resume token using the resume_token change stream method and restarting the change stream using the :resume_after option on all MongoDB or network errors:

token = nil
loop do
  begin
    stream = collection.watch([], resume_after: token)
    enum = stream.to_enum
    while doc = enum.next
      process(doc)
      token = stream.resume_token
    end
  rescue Mongo::Error
    sleep 1
  end
end

The above iteration is blocking at the enum.next call, and does not permit resuming processing in the event the Ruby process running this code is terminated. The driver also provides the try_next method which returns nil (after a small waiting period) instead of blocking indefinitely when there are no changes in the change stream. Using the try_next method, the resume token may be persisted after each getMore request, even when a particular request does not return any changes, such that the resume token remains at the top of the oplog and the application has an opportunity to persist it should the process handling changes terminates:

token = nil
loop do
  begin
    stream = collection.watch([], resume_after: token)
    enum = stream.to_enum
    doc = enum.try_next
    if doc
      process(doc)
    end
    token = stream.resume_token
    # Persist +token+ to support resuming processing upon process restart
  rescue Mongo::Error
    sleep 1
  end
end

Note that the resume token should be retrieved from the change stream after every try_next call, even if the call returned no document.

The resume token is also provided in the _id field of each change stream document. Reading the _id field is not recommended because it may be projected out by the application, and because using only the _id field would not advance the resume token when a getMore returns no documents.

←   Transactions Databases  →