Navigation

Kafka Sink Connector Configuration Properties

Sink Connector Configuration Properties

This section lists the available configuration settings used to compose a properties file for the MongoDB Kafka Sink Connector. The connector uses these settings to determine which topics to consume data from and what data to sink to MongoDB. For an example configuration file, see MongoSinkConnector.properties.

Name Type Description
topics list
A list of Kafka topics for the sink connector.

Required
Accepted Values: A comma-separated list of valid Kafka topics
connection.uri string

Example

mongodb://username:password@localhost/
Default: mongodb://localhost:27017
Accepted Values: A valid MongoDB connection URI string
database string
The name of the MongoDB database the sink writes to.

Required
Accepted Values: A valid MongoDB database name
collection string
Single sink MongoDB collection name to write to. If the sink follows multiple topics, this is the default collection they are mapped to.

Required
Accepted Values: A MongoDB collection name
document.id.strategy string
The class name of the class that generates a unique document _id field.

Default: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Accepted Values: An empty string or a fully qualified Java class name
delete.on.null.values boolean
Whether the connector should delete documents with matching key values when value is null.

Default: false
Accepted Values: true or false
max.batch.size int
The maximum number of sink records to batch together for processing.

Default: 0
Accepted Values: An integer
max.num.retries int
How many retries should be attempted on write errors.

Default: 3
Accepted Values: An integer
retries.defer.timeout int
How long (in milliseconds) a retry should get deferred.

Default: 5000
Accepted Values: An integer
change.data.capture.handler string
The class name of the CDC handler to use for processing.

Default: ""
Accepted Values: An empty string or a fully qualified Java class name
field.renamer.mapping string
An inline JSON array with objects describing field name mappings.

Example

[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]
Default: []
Accepted Values: A valid JSON array
field.renamer.regexp string
An inline JSON array containing regular expression statement objects.

Example

[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]
Default: []
Accepted Values: A valid JSON array
key.projection.list string
A list of field names to include in the key projection.

Default: ""
Accepted Values: A comma-separated list of field names
key.projection.type string
The type of key projection to use.

Default: none
Accepted Values: none, blacklist, or whitelist
post.processor.chain list
A list of post-processor classes that process the data before saving it to MongoDB.

Default: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Accepted Values: A comma-separated list of fully qualified Java class names
rate.limiting.every.n int
Number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting.

Default: 0
Accepted Values: An integer
rate.limiting.timeout int
How long (in milliseconds) to wait before continuing to process data once the rate limit is reached.

Default: 0
Accepted Values: An integer
value.projection.list string
A list of field names to include in the value projection.

Default: ""
Accepted Values: A comma-separated list of field names
value.projection.type string
The type of value projection to use.

Default: none
Accepted Values: none, blacklist, or whitelist
writemodel.strategy string
The class that specifies the WriteModel to use for Bulk Writes.

Default: com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
Accepted Values: A fully qualified Java class name of a class that implements WriteModelStrategy
topic.override.<topicName>.<propertyName> string
Per-topic configurations that override the corresponding global and default property settings. For example: topic.override.foo.collection=bar instructs the sink connector to store data from the foo topic in the bar collection. For additional examples, see Topic-Specific Configuration Settings.

Note

You can specify any configuration on a per-topic basis except for connection.uri and topics.

Default: ""
Accepted Values: Accepted values specific to the overridden property

Topic-Specific Configuration Settings

The MongoDB Kafka Sink Connector can be configured to sink data from multiple topics. You can override global or default property settings with a topic-specific setting in the form of topic.override.<topicName>.<propertyName>.

Note

The topics and connection.uri properties are global and cannot be overridden.

The following example demonstrates specifying topic-specific settings.

Example: Override Connector Sink Settings on TopicA

topic.override.topicA.collection=collectionA
topic.override.topicA.max.batch.size=100
topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlacklistValueProjector
topic.override.topicA.value.projection.type=blacklist
topic.override.topicA.value.projection.list=k2,k4

The sink connector topic override settings instruct the connector to apply the following behavior for data consumed from topicA:

  • Write documents to the MongoDB collection collectionA in batches of up to 100.
  • Generate a UUID to be stored in the _id field for each new document.
  • Omit fields k2 and k4 from the value projection using a blacklist.