Navigation

Kafka Source Connector Guide

Overview

The MongoDB Kafka Source Connector moves data from a MongoDB replica set into a Kafka cluster. The connector configures and consumes change stream event documents and publishes them to a topic.

Change streams, a feature introduced in MongoDB 3.6, generate event documents that contain changes to data stored in MongoDB in real-time and provide guarantees of durability, security, and idempotency. You can configure change streams to observe changes at the collection, database, or deployment level. See An Introduction to Change Streams for more information.

Note

Change streams require a replicaSet or a sharded cluster using replicaSets.

Message Delivery Guarantee

The Source Connector guarantees “at-least-once” delivery by default. If you set the copy.existing setting to true, the connector may deliver duplicate messages. Since these messages are idempotent, there is no need to support “at-most-once” nor “exactly-once” guarantees.

Change Stream Event Document Format

A change stream event document contains several fields that describe the event:

  • The top-level _id field is used as the resume token which is used to start a change stream from a specific point in time.
  • The operationType field identifies the type of change represented in the change stream document. Possible values include: “insert”, “update”, “replace”, “delete”, “invalidate”, “drop”, “dropDatabase”, and “rename”.
  • The fullDocument field contents depend on the operation as follows:
    • For insert and replace operations, it contains the new document being inserted or replacing the existing document.
    • For update operations, it contains the complete document that is being updated at some point in time after the update occurred. If the document was deleted since the update, it contains a null value.
  • The documentKey contains either the _id field of the document that was updated or all the components of a shard key for sharded collections.
  • The txnNumber and lsid identify the transaction if the change occurred within one.
{
  _id: { <BSON Object> },
  "operationType": "<operation>",
  "fullDocument": { <document> },
  "ns": {
    "db": <database>,
    "coll": <collection>
  },
  "to": {
    "db": <database>,
    "coll": <collection>
  },
  "documentKey": {
    _id: <value>
  },
  "updateDescription": {
    "updatedFields": { <document> },
    "removedFields": [ <field>, ... ]
  },
  "clusterTime": <Timestamp>,
  "txnNumber": <NumberLong>,
  "lsid": {
    "id": <UUID>,
    "uid": <BinData>
  }
}

Source Connector Configuration Properties

The MongoDB Kafka Source Connector uses the following settings to create change streams and customize the output to save to the Kafka cluster. For an example source connector configuration file, see MongoSourceConnector.properties.

Name Type Description
connection.uri string

Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017,localhost:27018,localhost:27019
Accepted Values: A valid MongoDB connection URI string
database string
Name of the database to watch for changes. If not set, all databases are watched.

Default: “”
Accepted Values: A single database name
collection string
Name of the collection in the database to watch for changes. If not set then all collections will be watched.

Default: “”
Accepted Values: A single collection name
publish.full.document.only boolean
Only publish the changed document instead of the full change stream document. Sets the change.stream.full.document=updateLookup automatically so updated documents will be included.

Default: false
Accepted Values: true or false
pipeline string
An array of objects describing the pipeline operations to run.

Example

[{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]
Default: []
Accepted Values: Valid aggregation pipeline stages
collation string
A JSON collation document that contains options to use for the change stream. Append .asDocument().toJson() to the collation document to create the JSON representation.

Default: “”
Accepted Values: A valid JSON document representing a collection
output.format.key string
Determines which data format the source connector outputs for the key document.

Default: json
Accepted Values: bson, json, schema
output.format.value string
Determines which data format the source connector outputs for the value document.

Default: json
Accepted Values: bson, json, schema
output.json.formatter string
Full class name of the JSON formatter.

Default: com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
Accepted Values:
- com.mongodb.kafka.connect.source.json.formatter.DefaultJson
- com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
- com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
- Or other user-provided class name
output.schema.key string
The Avro schema definition for the key document of the SourceRecord.
Default:
{
  "type": "record",
  "name": "keySchema",
  "fields" : [ { "name": "_id", "type": "string" } ]"
}
Accepted Values: A valid JSON object
output.schema.value string
The Avro schema definition for the value document of the SourceRecord.

Default:
 {
   "name": "ChangeStream",
   "type": "record",
   "fields": [
     { "name": "_id", "type": "string" },
     { "name": "operationType", "type": ["string", "null"] },
     { "name": "fullDocument", "type": ["string", "null"] },
     { "name": "ns",
       "type": [{"name": "ns", "type": "record", "fields": [
                 {"name": "db", "type": "string"},
                 {"name": "coll", "type": ["string", "null"] } ]
                }, "null" ] },
     { "name": "to",
       "type": [{"name": "to", "type": "record",  "fields": [
                 {"name": "db", "type": "string"},
                 {"name": "coll", "type": ["string", "null"] } ]
                }, "null" ] },
     { "name": "documentKey", "type": ["string", "null"] },
     { "name": "updateDescription",
       "type": [{"name": "updateDescription",  "type": "record", "fields": [
                  {"name": "updatedFields", "type": ["string", "null"]},
                  {"name": "removedFields",
                   "type": [{"type": "array", "items": "string"}, "null"]
                   }] }, "null"] },
     { "name": "clusterTime", "type": ["string", "null"] },
     { "name": "txnNumber", "type": ["long", "null"]},
     { "name": "lsid", "type": [{"name": "lsid", "type": "record",
                "fields": [ {"name": "id", "type": "string"},
                              {"name": "uid", "type": "string"}] }, "null"] }
   ]
}
Accepted Values: A valid JSON object
output.schema.infer.value boolean
Whether the connector should infer the schema for the value. Since each document is processed in isolation, multiple schemas may result. Only valid when schema is specified in the output.format.value setting.

Default: false
Accepted Values: true or false
offset.partition.name string
Custom partition name to use in which to store the offset values. The offset value stores information on where to resume processing if there is an issue that requires you to restart the connector. By choosing a new partition name, you can start processing without using a resume token. This can make it easier to restart the connector without reconfiguring the Kafka Connect service or manually deleting the old offset. The offset partition is automatically created if it does not exist.

Default: “”
Accepted Values: A string
batch.size int
The cursor batch size.

Default: 0
Accepted Values: An integer
change.stream.full.document string
Determines what to return for update operations when using a Change Stream. When set to ‘updateLookup’, the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some point in time after the change occurred.

Default: “”
Accepted Values: “” or default or updateLookup
poll.await.time.ms long
The amount of time to wait before checking for new results on the change stream

Default: 5000
Accepted Values: An integer
poll.max.batch.size int
Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.

Default: 1000
Accepted Values: An integer
topic.prefix string
Prefix to prepend to database & collection names to generate the name of the Kafka topic to publish data to.
Default: “”
Accepted Values: A string
copy.existing boolean
Copy existing data from source collections and convert them to Change Stream events on their respective topics. Any changes to the data that occur during the copy process are applied once the copy is completed.

Default: false
Accepted Values: true or false
copy.existing.namespace.regex string
Regular expression that matches the namespaces from which to copy data. A namespace describes the database name and collection separated by a period, e.g. databaseName.collectionName.

Example

In the following example, the setting matches all collections that start with “page” in the “stats” database.

copy.existing.namespace.regex=stats\.page.*
Default: “”
Accepted Values: A valid regular expression
copy.existing.max.threads int
The number of threads to use when performing the data copy. Defaults to the number of processors.

Default: defaults to the number of processors
Accepted Values: An integer
copy.existing.queue.size int
The max size of the queue to use when copying data.
Default: 16000
Accepted Values: An integer
copy.existing.pipeline list
An array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient.

Example

In the following example, the $match aggregation operator ensures that only documents in which the closed field is set to false are copied.

copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]

Default: []
Accepted Values: Valid aggregation pipeline stages
errors.tolerance string
Whether to continue processing messages if an error is encountered. When set to none, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all, the connector silently ignores any bad messages.

Default: “none”
Accepted Values: “none” or “all”
errors.log.enable boolean
Whether details of failed operations should be written to the log file. When set to true, both errors that are tolerated (determined by the errors.tolerance setting) and not tolerated are written. When set to false, errors that are tolerated are omitted.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.name string
Name of topic to use as the dead letter queue. If blank, none of the invalid messages are written to the dead letter queue.

errors.tolerance must be set to all to use this property.

Default: “”
Accepted Values: A valid Kafka topic name
offset.partition.name string
A custom offset partition name to use. This option can be used to start a new change stream when an existing offset contains an invalid resume token. If blank, the default partition name based on the connection details is used.

Default: “”
Accepted Values: A valid partition name
heartbeat.interval.ms int
The length of time in milliseconds between sending heartbeat messages to record a post batch resume token when no source records have been published. This can improve the resumability of the connector for low volume namespaces. Use 0 to disable.

Default: 0
Accepted Values: An integer
heartbeat.topic.name string
The name of the topic to write heartbeat messages to.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

Note

The default maximum size for Kafka messages is 1MB. Update the following Kafka (versions 0.11.0 through 2.2) configuration properties to enable a larger maximum size if the JSON string size of the change stream documents exceeds the maximum:

System Property Name Description
Consumer max.partition.fetch.bytes Maximum size of a message that can be fetched by a consumer.
Broker replica.fetch.max.bytes Maximum size of a message that can be replicated within a Kafka cluster.
Broker message.max.bytes Maximum size of a message from a producer that is accepted by the broker.
Producer max.message.bytes Per referenced topic, the maximum size of an uncompressed message that can be appended to a topic.

Custom Pipeline Example

You can use the pipeline configuration setting to define a custom aggregation pipeline to filter or modify the change events output. In this example, we set the pipeline configuration to observe only insert change events:

pipeline=[{"$match": {"operationType": "insert"}}]

Note

Make sure the results of the aggregation pipeline contain the top-level _id field which MongoDB uses as the resume token.

Multiple Source Example

You can configure the source connector to listen for events from multiple collections by using the pipeline configuration with a custom aggregation pipeline to match your collection names.

The following sample configuration shows how you can set your source connector to listen to the collection1 and collection2 collections by matching with a regular expression on the collection names:

pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

You can also set a regular expression to match all collections except for ones that match specific names. The following sample configuration shows how you can set your source connector to listen to all collections except ones in the collection named “customers”:

"pipeline":[{"$match": {"ns.coll": {"$regex": "/^(?!customers\.).*/"}}}]

For more information on how to build regular expressions, see the Java SE documentation on Patterns.

Topic Naming Example

The MongoDB Kafka Source connector publishes the changed data events to a Kafka topic that consists of the database and collection name from which the change originated. For example, if an insert was performed on the test database and data collection, the connector will publish the data to a topic named test.data.

If the topic.prefix configuration is set to true, the Kafka topic name will be prepended with the specified value. For example:

topic.prefix=mongo

Once set, any data changes to the data collection in the test database are published to a topic named mongo.test.data.

Existing Data Copy Example

The MongoDB Kafka Source connector can be configured to copy existing data from collections in a database to their associated topic as insert events prior to broadcasting change stream events. The connector does not support renaming a collection during the copy process.

Data Copy Can Produce Duplicate Events

If clients make changes to the data in the database while the source connector is converting existing data, the subsequent change stream events may contain duplicates. Since change stream events are idempotent, the data is eventually consistent.

The following configuration example instructs the connector to copy all collections in the example database, convert the data to change stream events, and broadcast changes after any collection data is updated.

database=example
copy.existing=true

How to Recover from An Invalid Resume Token

In the event your Connector pauses or shuts down long enough for the Change Stream resume token to expire from the MongoDB Oplog, you may encounter an error that prevents you from starting up the Connector. If you encounter this condition, you must delete the topic data referenced by the offset.storage.topic setting if you are using Distributed Mode or the file referenced by the offset.storage.file.filename setting (e.g. /tmp/connect.offsets) if you are using the Standalone mode. After you delete the appropriate data, you should be able to start your Connector workers and listen to the change stream.