Navigation
This version of the documentation is archived and no longer supported.

Change Streams

As of version 3.6 of the MongoDB server, a new $changeStream pipeline stage is supported in the aggregation framework. The Ruby driver provides an API for receiving notifications for changes to a particular collection using this new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework directly, it is recommended to use the driver API described below as the driver resumes the change stream if there is timeout or network error.

Change streams on the server requires a "majority" read concern or no read concern.

Change streams do not work properly with JRuby because of the issue documented here.

Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread. So calling #next on the change stream will cause getmores to be called in a loop in the background.

Watching for changes on a particular collection

A change stream is created by calling the #watch method on a collection:

stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

You can also receive the notifications as they are available:

stream = client[:test].watch
enum = stream.to_enum
while doc = enum.next
  process(doc)
end

The change stream can take filters in the aggregation framework pipeline operator format:

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
                           {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
                          ])
enum = stream.to_enum
while doc = enum.next
  process(doc)
end

Close a Change Stream

You can close a Change Stream by calling the #close method:

stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)
stream.close