Navigation

Kafka Sink Connector Configuration Properties

This section lists the available configuration settings used to compose a properties file for the MongoDB Kafka Sink Connector. The connector uses these settings to determine which topics to consume data from and what data to sink to MongoDB. For an example configuration file, see MongoSinkConnector.properties.

NameTypeDescription
topicslist
A list of Kafka topics that the sink connector to watch.

Required
Note: You can only define either topics or topics.regex.
Accepted Values: A comma-separated list of valid Kafka topics
topics.regexstring
A regular expression that matches the Kafka topics that the sink connector should watch.

The following regex matches topics such as "activity.landing.clicks" and "activity.support.clicks", but not "activity.landing.views" or "activity.clicks":

topics.regex=activity\\.\\w+\\.clicks$
Required
Note: You can only define either topics or topics.regex.
Accepted Values: A valid regular expression match pattern using java.util.regex.Pattern.
connection.uristring
mongodb://username:password@localhost/
Important With Circle IconCreated with Sketch.Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017
Accepted Values: A valid MongoDB connection URI string
databasestring
The name of the MongoDB database the sink writes to.

Required
Accepted Values: A valid MongoDB database name
collectionstring
Single sink MongoDB collection name to write to. If the sink follows multiple topics, this is the default collection they are mapped to.

Required
Accepted Values: A MongoDB collection name
document.id.strategystring
The class name of the class that generates a unique document _id field.

Default: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Accepted Values: An empty string or a fully qualified Java class name
document.id.strategy.overwrite.existingboolean
Whether the connector should overwrite existing values in the _id field when the strategy defined in document.id.strategy is applied.

Default: false
Accepted Values: true or false
document.id.strategy.uuid.formatstring
Whether the connector should output the UUID in the _id field as a string or in the BsonBinary format.

Default: string
Accepted Values: string or binary
delete.on.null.valuesboolean
Whether the connector should delete documents with matching key values when value is null.

Default: false
Accepted Values: true or false
max.batch.sizeint
The maximum number of sink records to batch together for processing.

Default: 0
Accepted Values: An integer
max.num.retriesint
How many retries should be attempted on write errors.

Default: 1
Accepted Values: An integer
retries.defer.timeoutint
How long (in milliseconds) a retry should get deferred.

Default: 5000
Accepted Values: An integer
change.data.capture.handlerstring
The class name of the CDC handler to use for processing.

Default: ""
Accepted Values: An empty string or a fully qualified Java class name
field.renamer.mappingstring
An inline JSON array with objects describing field name mappings.
[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]
Default: []
Accepted Values: A valid JSON array
field.renamer.regexpstring
An inline JSON array containing regular expression statement objects.
[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]
Default: []
Accepted Values: A valid JSON array
key.projection.liststring
A list of field names to include in the key projection.

Default: ""
Accepted Values: A comma-separated list of field names
key.projection.typestring
The type of key projection to use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
post.processor.chainlist
A list of post-processor classes that process the data before saving it to MongoDB.

Default: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Accepted Values: A comma-separated list of fully qualified Java class names
rate.limiting.every.nint
Number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting.

Default: 0
Accepted Values: An integer
rate.limiting.timeoutint
How long (in milliseconds) to wait before continuing to process data once the rate limit is reached.

Default: 0
Accepted Values: An integer
value.projection.liststring
A list of field names to include in the value projection.

Default: ""
Accepted Values: A comma-separated list of field names
value.projection.typestring
The type of value projection to use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
writemodel.strategystring
The class that specifies the WriteModel to use for Bulk Writes. See Custom Write Models for more information.

Default: com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
Accepted Values: A fully qualified Java class name of a class that implements WriteModelStrategy
topic.override.<topicName>.<propertyName>string
Per-topic configurations that override the corresponding global and default property settings. For example: topic.override.foo.collection=bar instructs the sink connector to store data from the foo topic in the bar collection. For additional examples, see Topic-Specific Configuration Settings.
Info With Circle IconCreated with Sketch.Note

You can specify any configuration on a per-topic basis except for connection.uri and topics.

Default: ""
Accepted Values: Accepted values specific to the overridden property
tasks.maxint
The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot handle the specified level of parallelism.
Important With Circle IconCreated with Sketch.Important
Messages May Be Processed Out of Order For Values Greater Than 1

If you specify a value greater than 1, the connector enables parallel processing of the tasks. If your topic has multiple partition logs (allows consumers to read from the topic in parallel), messages may be processed out of order.

Default: 1
Accepted Values: a positive integer.
heartbeat.interval.msint
The length of time in milliseconds between sending heartbeat messages to record the post batch resume token when no source records have been published. Improves the resumability of the connector for low volume namespaces. Use 0 to disable.

Default: 0
Accepted Values: An integer
heartbeat.topic.namestring
The name of the topic to publish heartbeat messages to.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

The MongoDB Kafka Sink Connector can be configured to sink data from multiple topics. You can override global or default property settings with a topic-specific setting in the form of topic.override.<topicName>.<propertyName>.

Info With Circle IconCreated with Sketch.Note

The topics and connection.uri properties are global and cannot be overridden.

The following example demonstrates specifying topic-specific settings.

topic.override.topicA.collection=collectionA
topic.override.topicA.max.batch.size=100
topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
topic.override.topicA.value.projection.type=BlockList
topic.override.topicA.value.projection.list=k2,k4

The sink connector topic override settings instruct the connector to apply the following behavior for data consumed from topicA:

  • Write documents to the MongoDB collection collectionA in batches of up to 100.
  • Generate a UUID to be stored in the _id field for each new document.
  • Omit fields k2 and k4 from the value projection using the BlockList projection type.

Kafka connectors send messages that cannot be processed to the dead letter queue. The connector sends invalid messages to this queue in order to allow manual inspection, updates, and re-submission for processing. It is disabled by default and requires restarting the connector for the changes to the configuration to take effect.

The following is an example configuration for enabling the dead letter queue topic example.deadletterqueue that specifies that both the invalid message as well as the log file should record invalid messages, and that context headers should be included in the dead letter queue messages.

errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=example.deadletterqueue
errors.deadletterqueue.context.headers.enable=true

The table below describes the configuration settings relevant to the dead letter queue.

NameTypeDescription
errors.tolerancestring
Whether to continue processing messages if an error is encountered. When set to none, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all, the connector silently ignores any bad messages.

Default: "none"
Accepted Values: "none" or "all"
errors.log.enableboolean
Whether details of failed operations should be written to the log file. When set to true, both errors that are tolerated (determined by the errors.tolerance setting) and not tolerated are written. When set to false, errors that are tolerated are omitted.

Default: false
Accepted Values: true or false
errors.log.include.messagesboolean
Whether to include the invalid message, including record keys, values, and headers, in the error log file.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.namestring
Name of topic to use as the dead letter queue. If blank, none of the invalid messages are written to the dead letter queue.

Default: ""
Accepted Values: A valid Kafka topic name
errors.deadletterqueue.context.headers.enableboolean
Whether to provide the context headers in messages written to the dead letter queue.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.replication.factorinteger
The number of nodes on which to replicate the dead letter queue topic. If running a single-node Kafka cluster, this must be set to 1.

Default: 3
Accepted Values: A valid number

See the Confluent Sink Connector documentation for more information on these settings.

Give Feedback