Navigation

MongoDB\Database::watch()

New in version 1.4.

Definition

MongoDB\Database::watch

Executes a change stream operation on the database. The change stream can be watched for database-level changes.

function watch(array $pipeline = [], array $options = []): MongoDB\ChangeStream

This method has the following parameters:

Parameter Type Description
$pipeline array|object Optional. The pipeline of stages to append to an initial $changeStream stage.
$options array Optional. An array specifying the desired options.

The $options parameter supports the following options:

Option Type Description
batchSize integer

Optional. Specifies the batch size for the cursor, which will apply to both the initial aggregate command and any subsequent getMore commands. This determines the maximum number of change events to return in each response from the server.

Note

Irrespective of the batchSize option, the initial aggregate command response for a change stream generally does not include any documents unless another option is used to configure its starting point (e.g. startAfter).

collation array|object Optional. Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks. When specifying collation, the locale field is mandatory; all other collation fields are optional. For descriptions of the fields, see Collation Document.
comment mixed

Optional. Enables users to specify an arbitrary comment to help trace the operation through the database profiler, currentOp output, and logs.

The comment can be any valid BSON type for server versions 4.4 and above. Earlier server versions only support string values.

New in version 1.13.

fullDocument string

Optional. Determines how the “fullDocument” response field will be populated for update operations.

By default, change streams only return the delta of fields (via an “updateDescription” field) for update operations and “fullDocument” is omitted. Insert and replace operations always include the “fullDocument” field. Delete operations omit the field as the document no longer exists.

Specify “updateLookup” to return the current majority-committed version of the updated document.

MongoDB 6.0+ allows returning the post-image of the modified document if the collection has changeStreamPreAndPostImages enabled. Specify “whenAvailable” to return the post-image if available or a null value if not. Specify “required” to return the post-image if available or raise an error if not.

The following values are supported:

  • MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP
  • MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE
  • MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED

Note

This is an option of the $changeStream pipeline stage.

fullDocumentBeforeChange string

Optional. Determines how the “fullDocumentBeforeChange” response field will be populated. By default, the field is omitted.

MongoDB 6.0+ allows returning the pre-image of the modified document if the collection has changeStreamPreAndPostImages enabled. Specify “whenAvailable” to return the pre-image if available or a null value if not. Specify “required” to return the pre-image if available or raise an error if not.

The following values are supported:

  • MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE
  • MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED

Note

This is an option of the $changeStream pipeline stage.

maxAwaitTimeMS integer Optional. Positive integer denoting the time limit in milliseconds for the server to block a getMore operation if no data is available.
readConcern MongoDB\Driver\ReadConcern Optional. Read concern to use for the operation. Defaults to the database’s read concern.
readPreference MongoDB\Driver\ReadPreference

Optional. Read preference to use for the operation. Defaults to the database’s read preference.

This is used for both the initial change stream aggregation and for server selection during an automatic resume.

resumeAfter array|object

Optional. Specifies the logical starting point for the new change stream. The _id field in documents returned by the change stream may be used here.

Using this option in conjunction with startAfter and/or startAtOperationTime will result in a server error. The options are mutually exclusive.

Note

This is an option of the $changeStream pipeline stage.

session MongoDB\Driver\Session Optional. Client session to associate with the operation.
showExpandedEvents boolean

Optional. If true, instructs the server to include additional DDL events in the change stream. The additional events that may be included are:

  • createIndexes
  • dropIndexes
  • modify
  • create
  • shardCollection
  • reshardCollection (server 6.1+)
  • refineCollectionShardKey (server 6.1+)

This is not supported for server versions prior to 6.0 and will result in an exception at execution time if used.

Note

This is an option of the $changeStream pipeline stage.

New in version 1.13.

startAfter array|object

Optional. Specifies the logical starting point for the new change stream. The _id field in documents returned by the change stream may be used here. Unlike resumeAfter, this option can be used with a resume token from an “invalidate” event.

Using this option in conjunction with resumeAfter and/or startAtOperationTime will result in a server error. The options are mutually exclusive.

This is not supported for server versions prior to 4.2 and will result in an exception at execution time if used.

Note

This is an option of the $changeStream pipeline stage.

startAtOperationTime MongoDB\BSON\TimestampInterface

Optional. If specified, the change stream will only provide changes that occurred at or after the specified timestamp. Command responses from a MongoDB 4.0+ server include an operationTime that can be used here. By default, the operationTime returned by the initial aggregate command will be used if available.

Using this option in conjunction with resumeAfter and/or startAfter will result in a server error. The options are mutually exclusive.

This is not supported for server versions prior to 4.0 and will result in an exception at execution time if used.

Note

This is an option of the $changeStream pipeline stage.

typeMap array Optional. The type map to apply to cursors, which determines how BSON documents are converted to PHP values. Defaults to the database’s type map.

Return Values

A MongoDB\ChangeStream object, which allows for iteration of events in the change stream via the Iterator interface.

Errors/Exceptions

MongoDB\Exception\UnexpectedValueException if the command response from the server was malformed.

MongoDB\Exception\UnsupportedException if options are used and not supported by the selected server (e.g. collation, readConcern, writeConcern).

MongoDB\Exception\InvalidArgumentException for errors related to the parsing of parameters or options.

MongoDB\Driver\Exception\RuntimeException for other errors at the driver level (e.g. connection errors).

Examples

This example reports events while iterating a change stream.

<?php

$uri = 'mongodb://rs1.example.com,rs2.example.com/?replicaSet=myReplicaSet';

$database = (new MongoDB\Client($uri))->test;

$changeStream = $database->watch();

for ($changeStream->rewind(); true; $changeStream->next()) {
    if ( ! $changeStream->valid()) {
        continue;
    }

    $event = $changeStream->current();

    if ($event['operationType'] === 'invalidate') {
        break;
    }

    $ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']);
    $id = json_encode($event['documentKey']['_id']);

    switch ($event['operationType']) {
        case 'delete':
            printf("Deleted document in %s with _id: %s\n\n", $ns, $id);
            break;

        case 'insert':
            printf("Inserted new document in %s\n", $ns);
            echo json_encode($event['fullDocument']), "\n\n";
            break;

        case 'replace':
            printf("Replaced new document in %s with _id: %s\n", $ns, $id);
            echo json_encode($event['fullDocument']), "\n\n";
            break;

        case 'update':
            printf("Updated document in %s with _id: %s\n", $ns, $id);
            echo json_encode($event['updateDescription']), "\n\n";
            break;
    }
}

Assuming that a document was inserted, updated, and deleted while the above script was iterating the change stream, the output would then resemble:

Inserted new document in test.inventory
{"_id":{"$oid":"5a81fc0d6118fd1af1790d32"},"name":"Widget","quantity":5}

Updated document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"}
{"updatedFields":{"quantity":4},"removedFields":[]}

Deleted document in test.inventory with _id: {"$oid":"5a81fc0d6118fd1af1790d32"}

See Also