Navigation

Kafka Sink Connector Configuration Properties

Sink Connector Configuration Properties

This section lists the available configuration settings used to compose a properties file for the MongoDB Kafka Sink Connector. The connector uses these settings to determine which topics to consume data from and what data to sink to MongoDB. For an example configuration file, see MongoSinkConnector.properties.

Name Type Description
topics list
A list of Kafka topics that the sink connector to watch.

Required
Note: You can only define either topics or topics.regex.
Accepted Values: A comma-separated list of valid Kafka topics
topics.regex string
A regular expression that matches the Kafka topics that the sink connector should watch.

Example

The following regex matches topics such as
“activity.landing.clicks” and “activity.support.clicks”,
but not “activity.landing.views” or “activity.clicks”:
topics.regex=activity\\.\\w+\\.clicks$
Required
Note: You can only define either topics or topics.regex.
Accepted Values: A valid regular expression match pattern using java.util.regex.Pattern.
connection.uri string

Example

mongodb://username:password@localhost/

Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017
Accepted Values: A valid MongoDB connection URI string
database string
The name of the MongoDB database the sink writes to.

Required
Accepted Values: A valid MongoDB database name
collection string
Single sink MongoDB collection name to write to. If the sink follows multiple topics, this is the default collection they are mapped to.

Required
Accepted Values: A MongoDB collection name
document.id.strategy string
The class name of the class that generates a unique document _id field.

Default: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Accepted Values: An empty string or a fully qualified Java class name
document.id.strategy.overwrite.existing boolean
Whether the connector should overwrite existing values in the _id field when the strategy defined in document.id.strategy is applied.

Default: true
Accepted Values: true or false
delete.on.null.values boolean
Whether the connector should delete documents with matching key values when value is null.

Default: false
Accepted Values: true or false
max.batch.size int
The maximum number of sink records to batch together for processing.

Default: 0
Accepted Values: An integer
max.num.retries int
How many retries should be attempted on write errors.

Default: 3
Accepted Values: An integer
retries.defer.timeout int
How long (in milliseconds) a retry should get deferred.

Default: 5000
Accepted Values: An integer
change.data.capture.handler string
The class name of the CDC handler to use for processing.

Default: ""
Accepted Values: An empty string or a fully qualified Java class name
field.renamer.mapping string
An inline JSON array with objects describing field name mappings.

Example

[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]
Default: []
Accepted Values: A valid JSON array
field.renamer.regexp string
An inline JSON array containing regular expression statement objects.

Example

[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]
Default: []
Accepted Values: A valid JSON array
key.projection.list string
A list of field names to include in the key projection.

Default: ""
Accepted Values: A comma-separated list of field names
key.projection.type string
The type of key projection to use.

Default: none
Accepted Values: none, blocklist, or allowlist (Deprecated: blacklist, whitelist)
post.processor.chain list
A list of post-processor classes that process the data before saving it to MongoDB.

Default: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Accepted Values: A comma-separated list of fully qualified Java class names
rate.limiting.every.n int
Number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting.

Default: 0
Accepted Values: An integer
rate.limiting.timeout int
How long (in milliseconds) to wait before continuing to process data once the rate limit is reached.

Default: 0
Accepted Values: An integer
value.projection.list string
A list of field names to include in the value projection.

Default: ""
Accepted Values: A comma-separated list of field names
value.projection.type string
The type of value projection to use.

Default: none
Accepted Values: none, blocklist, or allowlist (Deprecated: blacklist, whitelist)
writemodel.strategy string
The class that specifies the WriteModel to use for Bulk Writes.

Default: com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
Accepted Values: A fully qualified Java class name of a class that implements WriteModelStrategy
topic.override.<topicName>.<propertyName> string
Per-topic configurations that override the corresponding global and default property settings. For example: topic.override.foo.collection=bar instructs the sink connector to store data from the foo topic in the bar collection. For additional examples, see Topic-Specific Configuration Settings.

Note

You can specify any configuration on a per-topic basis except for connection.uri and topics.

Default: ""
Accepted Values: Accepted values specific to the overridden property
tasks.max int
The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot handle the specified level of parallelism.

Messages May Be Processed Out of Order For Values Greater Than 1

If you specify a value greater than 1, the connector enables parallel processing of the tasks. If your topic has multiple partition logs (allows consumers to read from the topic in parallel), messages may be processed out of order.

Default: 1
Accepted Values: a positive integer.

Topic-Specific Configuration Settings

The MongoDB Kafka Sink Connector can be configured to sink data from multiple topics. You can override global or default property settings with a topic-specific setting in the form of topic.override.<topicName>.<propertyName>.

Note

The topics and connection.uri properties are global and cannot be overridden.

The following example demonstrates specifying topic-specific settings.

Example: Override Connector Sink Settings on TopicA

topic.override.topicA.collection=collectionA
topic.override.topicA.max.batch.size=100
topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlocklistValueProjector
topic.override.topicA.value.projection.type=blocklist
topic.override.topicA.value.projection.list=k2,k4

The sink connector topic override settings instruct the connector to apply the following behavior for data consumed from topicA:

  • Write documents to the MongoDB collection collectionA in batches of up to 100.
  • Generate a UUID to be stored in the _id field for each new document.
  • Omit fields k2 and k4 from the value projection using a blocklist.

Dead Letter Queue Configuration Settings

Kafka connectors send messages that cannot be processed to the dead letter queue. The connector sends invalid messages to this queue in order to allow manual inspection, updates, and re-submission for processing. It is disabled by default and requires restarting the connector for the changes to the configuration to take effect.

The following is an example configuration for enabling the dead letter queue topic example.deadletterqueue that specifies that both the invalid message as well as the log file should record invalid messages, and that context headers should be included in the dead letter queue messages.

errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=example.deadletterqueue
errors.deadletterqueue.context.headers.enable=true

The table below describes the configuration settings relevant to the dead letter queue.

Name Type Description
errors.tolerance string
Which errors to allow before the message processing task fails. When the connector encounters an error when tolerance is set to “none”, it reports an error and blocks further processing of the rest of the records. When set to “all”, the connector silently ignores any bad messages.

Default: “none”
Accepted Values: “none” or “all”
errors.log.enable boolean
Whether details of failed operations should be written to the log file. When set to true, both errors that are tolerated (determined by the errors.tolerance setting) and not tolerated are written. When set to false, errors that are tolerated are omitted.

Default: false
Accepted Values: true or false
errors.log.include.messages boolean
Whether to include the invalid message, including record keys, values, and headers, in the error log file.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.name string
Name of topic to use as the dead letter queue. If blank, none of the invalid messages are written to the dead letter queue.

Default: “”
Accepted Values: A valid Kafka topic name
errors.deadletterqueue.context.headers.enable boolean
Whether to provide the context headers in messages written to the dead letter queue.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.replication.factor integer
The number of nodes on which to replicate the dead letter queue topic. If running a single-node Kafka cluster, this must be set to 1.

Default: 3
Accepted Values: A valid number

See the Confluent Sink Connector documentation for more information on these settings.