Docs Menu

Kafka Source Connector Guide

On this page

The MongoDB Kafka Source Connector moves data from a MongoDB replica set into a Kafka cluster. The connector configures and consumes change stream event documents and publishes them to a topic.

Change streams, a feature introduced in MongoDB 3.6, generate event documents that contain changes to data stored in MongoDB in real-time and provide guarantees of durability, security, and idempotency. You can configure change streams to observe changes at the collection, database, or deployment level. See An Introduction to Change Streams for more information.

Important

Change streams require a replicaSet or a sharded cluster using replicaSets. A standalone MongoDB instance cannot produce a change stream.

Note
Change Stream Scaling Constraints

You can configure at most one Source Connector task to consume data from each change stream. If you need to read data from a change stream more quickly, consider creating multiple source change streams that produce events for non-overlapping criteria.

Example

Suppose your change stream listens for new insert events on a collection that tracks webpage view statistics by registered and anonymous users. The change stream pipeline might resemble the following:

pipeline=[{"$match": {"eventType": "views"}}]

In order to process changes with multiple Source Connector tasks, create each change stream with non-overlapping criteria as shown in the following pipeline configurations:

Source connector 1 configuration:

pipeline=[{"$match": {"$and": [{"isAnonymous": "true"}, {"eventType": "views"}]}}]

Source connector 2 configuration:

pipeline=[{"$match": {"$and": [{"isAnonymous": "false"}, {"eventType": "views"}]}}]

Avoid segmenting change streams that may depend on the order of events relative to each other. Separate Source Connector tasks cannot guarantee that they consume events in order relative to each other.

The Source Connector guarantees "at-least-once" delivery by default. If you set the copy.existing setting to true, the connector may deliver duplicate messages. Since these messages are idempotent, there is no need to support "at-most-once" nor "exactly-once" guarantees.

A change stream event document contains several fields that describe the event:

  • The top-level _id field is used as the resume token which is used to start a change stream from a specific point in time.
  • The operationType field identifies the type of change represented in the change stream document. Possible values include: "insert", "update", "replace", "delete", "invalidate", "drop", "dropDatabase", and "rename".
  • The fullDocument field contents depend on the operation as follows:

    • For insert and replace operations, it contains the new document being inserted or replacing the existing document.
    • For update operations, it contains the complete document that is being updated at some point in time after the update occurred. If the document was deleted since the update, it contains a null value.
  • The documentKey contains either the _id field of the document that was updated or all the components of a shard key for sharded collections.
  • The txnNumber and lsid identify the transaction if the change occurred within one.
{
_id: { <BSON Object> },
"operationType": "<operation>",
"fullDocument": { <document> },
"ns": {
"db": <database>,
"coll": <collection>
},
"to": {
"db": <database>,
"coll": <collection>
},
"documentKey": {
_id: <value>
},
"updateDescription": {
"updatedFields": { <document> },
"removedFields": [ <field>, ... ]
},
"clusterTime": <Timestamp>,
"txnNumber": <NumberLong>,
"lsid": {
"id": <UUID>,
"uid": <BinData>
}
}

The MongoDB Kafka Source Connector uses the following settings to create change streams and customize the output to save to the Kafka cluster. For an example source connector configuration file, see MongoSourceConnector.properties.

Settings
connection.uri

Type: string
Description:
mongodb://username:password@localhost/
For additional information, see the
Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017,localhost:27018,localhost:27019
Accepted Values: A valid MongoDB connection URI string
database

Type: string
Description:
Name of the database to watch for changes. If not set, all databases are watched.

Default: ""
Accepted Values: A single database name
collection

Type: string
Description:
Name of the collection in the database to watch for changes. If not set then all collections will be watched.
Important

If your database configuration is set to "", the connector ignores the collection setting.

Default: ""
Accepted Values: A single collection name
server.api.version

Type: string
Description:
The API version you want to use with your MongoDB instance. For more information on the Versioned API, see the MongoDB manual entry on the Versioned API.

Default: ""
Accepted Values: An empty string or a valid server version. At this time, the only valid server version is "1".
server.api.deprecationErrors

Type: boolean
Description:
When set to true, calling a command on your MongoDB instance that is deprecated in the declared API version causes the connector to raise an exception. You can set the API version with the server.api.version configuration option. For more information on the Versioned API, see the MongoDB manual entry on the Versioned API.

Default: false
Accepted Values: true or false
server.api.strict

Type: boolean
Description:
When set to true, calling a command on your MongoDB instance that is not part of the declared API version causes the connector to raise an exception. You can set the API version with the server.api.version configuration option. For more information on the Versioned API, see the MongoDB manual entry on the Versioned API.

Default: false
Accepted Values: true or false
publish.full.document.only

Type: boolean
Description:
Only publish the changed document instead of the full change stream document. Sets the change.stream.full.document=updateLookup automatically so updated documents will be included.

Default: false
Accepted Values: true or false
pipeline

Type: string
Description:
An array of objects describing the pipeline operations to run.
Example
[{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]
Tip
Default: []
Accepted Values: Valid aggregation pipeline stages
collation

Type: string
Description:
A JSON collation document that contains options to use for the change stream. Append .asDocument().toJson() to the collation document to create the JSON representation.

Default: ""
Accepted Values: A valid collation JSON document
output.format.key

Type: string
Description:
Determines which data format the source connector outputs for the key document.

Default: json
Accepted Values: bson, json, schema
output.format.value

Type: string
Description:
Determines which data format the source connector outputs for the value document.

Default: json
Accepted Values: bson, json, schema
output.json.formatter

Type: string
Description:
Full class name of the JSON formatter. You can also provide your own custom JSON formatter.

Default: com.mongodb.kafka.connect.source.json.formatter.DefaultJson
Accepted Values:
- com.mongodb.kafka.connect.source.json.formatter.DefaultJson
- com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
- com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
- Or your custom JSON formatter class name
output.schema.key

Type: string
Description:
The Avro schema definition for the key document of the SourceRecord.

Default:
{
"type": "record",
"name": "keySchema",
"fields" : [ { "name": "_id", "type": "string" } ]"
}
Accepted Values: A valid JSON object
output.schema.value

Type: string
Description:
The Avro schema definition for the value document of the SourceRecord.
Default:
{
"name": "ChangeStream",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "operationType", "type": ["string", "null"] },
{ "name": "fullDocument", "type": ["string", "null"] },
{ "name": "ns",
"type": [{"name": "ns", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "to",
"type": [{"name": "to", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "documentKey", "type": ["string", "null"] },
{ "name": "updateDescription",
"type": [{"name": "updateDescription", "type": "record", "fields": [
{"name": "updatedFields", "type": ["string", "null"]},
{"name": "removedFields",
"type": [{"type": "array", "items": "string"}, "null"]
}] }, "null"] },
{ "name": "clusterTime", "type": ["string", "null"] },
{ "name": "txnNumber", "type": ["long", "null"]},
{ "name": "lsid", "type": [{"name": "lsid", "type": "record",
"fields": [ {"name": "id", "type": "string"},
{"name": "uid", "type": "string"}] }, "null"] }
]
}
Accepted Values: A valid JSON object
output.schema.infer.value

Type: boolean
Description:
Whether the connector should infer the schema for the value. Since each document is processed in isolation, multiple schemas may result. Only valid when schema is specified in the output.format.value setting.

Default: false
Accepted Values: true or false
batch.size

Type: int
Description:
The cursor batch size.

Default: 0
Accepted Values: An integer
change.stream.full.document

Type: string
Description:
Determines what to return for update operations when using a Change Stream. When set to 'updateLookup', the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some point in time after the change occurred.

Default: ""
Accepted Values: "" or default or updateLookup
poll.await.time.ms

Type: long
Description:
The amount of time to wait before checking for new results on the change stream

Default: 5000
Accepted Values: An integer
poll.max.batch.size

Type: int
Description:
Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.

Default: 1000
Accepted Values: An integer
topic.prefix

Type: string
Description:
Prefix to prepend to database and collection names to generate the name of the Kafka topic to publish data to.
Tip
Default: ""
Accepted Values: A string
topic.suffix

Type: string
Description:
Suffix to append to database and collection names to generate the name of the Kafka topic to publish data to.
Tip
Default: ""
Accepted Values: A string
topic.namespace.map

Type: string
Description:
JSON object that maps change stream document namespaces to topics.
Example

The following configuration specifies the following two mappings:

  • All change documents in the myDb.myColl are sent to the topicTwo topic
  • All other change documents in the myDb database are sent to the topicOne topic
topic.namespace.map={"myDb": "topicOne", "myDb.myColl\": "topicTwo"}
You can also use the "*" wildcard character to match namespaces.
Example

The following configuration specifies a mapping from all of the change stream document namespaces to the topicThree topic:

topic.namespace.map={"*": "topicThree"}
Tip
Default: ""
Accepted Values: A valid JSON object
topic.mapper

Type: string
Description:
Full class name of the class that specifies custom topic mapping logic.

Default: com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper
Accepted Values: Valid full class name of an implementation of the TopicMapper class.
copy.existing

Type: boolean
Description:
Copy existing data from source collections and convert them to Change Stream events on their respective topics. Any changes to the data that occur during the copy process are applied once the copy is completed.

Default: false
Accepted Values: true or false
copy.existing.namespace.regex

Type: string
Description:
Regular expression that matches the namespaces from which to copy data. A namespace describes the database name and collection separated by a period, e.g. databaseName.collectionName.
Example

In the following example, the setting matches all collections that start with "page" in the "stats" database.

copy.existing.namespace.regex=stats\.page.*
Note that in the example above, the "\" character in the example escapes the following "." character in the regular expression. For more information on how to build regular expressions, see the Java SE documentation on Patterns.
Default: ""
Accepted Values: A valid regular expression
copy.existing.max.threads

Type: int
Description:
The number of threads to use when performing the data copy. Defaults to the number of processors.

Default: defaults to the number of processors
Accepted Values: An integer
copy.existing.queue.size

Type: int
Description:
The max size of the queue to use when copying data.
Default: 16000
Accepted Values: An integer
copy.existing.pipeline

Type: list
Description:
An array of JSON objects describing the pipeline operations to run when copying existing data. This can improve the use of indexes by the copying manager and make copying more efficient.
Example

In the following example, the $match aggregation operator ensures that only documents in which the closed field is set to false are copied.

copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
Default: []
Accepted Values: Valid aggregation pipeline stages
mongo.errors.tolerance

Type: string
Description:
Whether to continue processing messages if the connector encounters an error. When set to none, the connector reports any error and blocks further processing of the rest of the messages. When set to all, the connector silently ignores any bad messages.

This property overrides the errors.tolerance property of the Connect Framework.

Default: "none"
Accepted Values: "none" or "all"
mongo.errors.log.enable

Type: boolean
Description:
Whether details of failed operations should write to the log file. When set to true, the connector writes the errors tolerated (determined by the errors.tolerance or mongo.errors.tolerance setting) and not tolerated. When set to false, the connector only writes the errors that are not tolerated.

This property overrides the errors.log.enable property of the Connect Framework.

Default: false
Accepted Values: true or false
mongo.errors.deadletterqueue.topic.name

Type: string
Description:
Name of topic to use as the dead letter queue. If blank, the connector does not write invalid messages to the dead letter queue. If a value is provided, the connector writes invalid messages to the dead letter queue as extended JSON strings.

You must set errors.tolerance or mongo.errors.tolerance to all to use this property.

This property overrides the errors.deadletterqueue.topic.name property of the Connect Framework.

Default: ""
Accepted Values: A valid Kafka topic name
offset.partition.name

Type: string
Description:
A custom offset partition name to use. This option can be used to start a new change stream when an existing offset contains an invalid resume token. If blank, the default partition name based on the connection details is used.
Default: ""
Accepted Values: A valid partition name
heartbeat.interval.ms

Type: int
Description:
The length of time in milliseconds between sending heartbeat messages. Heartbeat messages contain a postBatchResumeToken. The value of this field contains the MongoDB server oplog entry that was most recently read from the change stream. The connector sends heartbeat messages when source records are not published in the specified interval. This improves the resumability of the connector for low volume namespaces. See the Invalid Resume Token section of this page for more information on the problem this feature can help solve. Use 0 to disable.
Default: 0
Accepted Values: An integer
heartbeat.topic.name

Type: string
Description:
The name of the topic to publish heartbeat messages to. To enable the heartbeat feature, you must provide a positive value in the heartbeat.interval.ms setting.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name
Note

The default maximum size for Kafka messages is 1MB. Update the following Kafka (versions 0.11.0 through 2.2) configuration properties to enable a larger maximum size if the JSON string size of the change stream documents exceeds the maximum:

System
Property Name
Description
Consumer
Maximum size of a message that can be fetched by a consumer.
Broker
Maximum size of a message that can be replicated within a Kafka cluster.
Broker
Maximum size of a message from a producer that is accepted by the broker.
Producer
Per referenced topic, the maximum size of an uncompressed message that can be appended to a topic.

You can use the pipeline configuration setting to define a custom aggregation pipeline to filter or modify the change events output. In this example, we set the pipeline configuration to observe only insert change events:

pipeline=[{"$match": {"operationType": "insert"}}]
Note

Make sure the results of the aggregation pipeline contain the top-level _id field which MongoDB uses as the resume token.

You can configure the source connector to listen for events from multiple collections by using the pipeline configuration with a custom aggregation pipeline to match your collection names.

The following sample configuration shows how you can set your source connector to listen to the collection1 and collection2 collections by matching with a regular expression on the collection names:

pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

You can also set a regular expression to match all collections except for ones that match specific names. The following sample configuration shows how you can set your source connector to listen to all collections except ones in the collection named "customers":

"pipeline":[{"$match": {"ns.coll": {"$regex": "/^(?!customers$).*/"}}}]

For more information on how to build regular expressions, see the Java SE documentation on Patterns.

By default, the MongoDB Kafka Source connector publishes the change data events to a Kafka topic that consists of the database and collection name -- also known as a namespace -- from which the change originated. For example, if an insert was performed on the test database and data collection, the connector will publish the data to a topic named test.data.

The following examples show how you can configure the topic name for change data events.

If you specify a value in the topic.prefix configuration setting, the connector prepends that value to the Kafka topic name. For example:

topic.prefix=mongo

Once set, any data changes to the data collection in the test database are published to a topic named mongo.test.data.

If you specify a value in the topic.suffix configuration setting, the connector appends that value to the Kafka topic name. For example:

topic.suffix=mongo

Once set, any data changes to the data collection in the test database are published to a topic named test.data.mongo.

If you specify mappings in the topic.namespace.map configuration setting, the connector publishes using the default topic naming scheme unless otherwise specified by the mapping.

Any mapping that includes both database and collection takes precedence over mappings that only specify the source database name. The following configuration example shows mappings for the carDb database as well as the carDb.ev namespace:

topic.namespace.map={"carDb": "automobiles", "carDb.ev": "electricVehicles"}

Since the carDb.ev takes precedence over the carDb mapping, the connector performs the following:

  • If the change document is from the database carDb and collection ev, change documents are sent to the electricVehicles topic.
  • Any change documents from the database carDb that are not from the collection ev are sent to the automobiles topic.
  • If the change document is from any database other than carDb, the connector sends it to the topic determined by the default namespace naming scheme which includes any value specified in the topic.prefix or topic.suffix setting.

You can use a wildcard * to match any collection names without mappings as shown in the following configuration example:

topic.namespace.map={"carDb": "automobiles", "carDb.ev": "electricVehicles", "*": "otherVehicles"}

In the preceding wildcard example, the connector publishes change documents from collections other than carDb to the otherVehicles topic.

You can configure the MongoDB Kafka Source connector to copy existing data from collections in a database to their associated topic as insert events prior to broadcasting change stream events. The connector does not support renaming a collection during the copy process.

Note
Data Copy Can Produce Duplicate Events

If clients make changes to the data in the database while the source connector is converting existing data, the subsequent change stream events may contain duplicates. Since change stream events are idempotent, the data is eventually consistent.

The following configuration example instructs the connector to copy all collections in the example database, convert the data to change stream events, and broadcast changes after it updates any collection data.

database=example
copy.existing=true

You can use the copy.existing.namespace.regex setting to provide a regular expression that matches specific collections by their namespace (database and collection name, separated with a "." character).

copy.existing.namespace.regex=stats\.page.*
copy.existing=true

The configuration shown above matches and copies existing data from all collections within the stats database. E.g. it matches stats.pageViews or stats.pageAverageRevenue, but not stat.pageViews or stats.uniquePages.

You can use the copy.existing.pipeline setting to provide a MongoDB aggregation pipeline expression that matches specific documents to include in the copy operation.

copy.existing.pipeline=[ { $match: { totalUniqueViews: { $gte: 5000 }} ]
copy.existing=true

The configuration shown above matches and copies existing documents that contain a field named totalUniqueViews with a value of 5000 or greater.

In the event your Connector pauses or shuts down long enough for the Change Stream resume token (also referred to as the postBatchResumeToken) to expire from the MongoDB Oplog, you may encounter an error that prevents you from starting up the Connector. If you encounter this condition, you must delete the topic data referenced by the offset.storage.topic setting if you are using Distributed Mode or the file referenced by the offset.storage.file.filename setting (e.g. /tmp/connect.offsets) if you are using the Standalone mode. After you delete the appropriate data, you should be able to start your Connector workers and listen to the change stream.

Tip
Consider Enabling Heartbeats

If you encounter an invalid resume token error due to your source MongoDB namespace being infrequently updated, consider enabling the heartbeat feature. This feature periodically updates the Change stream resume token to the current entry in your MongoDB Oplog if no updates occurred in your source MongoDB namespace. To learn how to enable heartbeats, see the Source Connector Configuration Properties section.

Give Feedback
MongoDB logo
© 2021 MongoDB, Inc.

About

  • Careers
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.