Navigation

Kafka Sink Connector Configuration Properties

This section lists the available configuration settings used to compose a properties file for the MongoDB Kafka Sink Connector. The connector uses these settings to determine which topics to consume data from and what data to sink to MongoDB. For an example configuration file, see MongoSinkConnector.properties.

Name
Type
Description
topics
list
A list of Kafka topics that the sink connector watches.

Required
Note: You can only define either topics or topics.regex.
Accepted Values: A comma-separated list of valid Kafka topics
topics.regex
string
A regular expression that matches the Kafka topics that the sink connector should watch.
The following regex matches topics such as "activity.landing.clicks" and "activity.support.clicks", but not "activity.landing.views" or "activity.clicks":
topics.regex=activity\\.\\w+\\.clicks$
Required
Note: You can only define either topics or topics.regex.
Accepted Values: A valid regular expression match pattern using java.util.regex.Pattern.
connection.uri
string
mongodb://username:password@localhost/
For additional information, see the Connect your Kafka Connector to MongoDB section.
Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017
Accepted Values: A valid MongoDB connection URI string
database
string
The name of the MongoDB database the sink writes to.

Required
Accepted Values: A valid MongoDB database name
collection
string
Single sink MongoDB collection name to write to. If the sink follows multiple topics, this is the default collection they are mapped to.

Required
Accepted Values: A MongoDB collection name
namespace.mapper
string
The class name of the class that specifies which database or collection in which to sink the data. The DefaultNamespaceMapper uses the database and collection settings.
Tip
Default: com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
Accepted Values: A fully qualified Java class name of a class that implements the NamespaceMapper interface.
document.id.strategy
string
The class name of the class that generates a unique document _id field.

Default: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Accepted Values: An empty string or a fully qualified Java class name
document.id.strategy.overwrite.existing
boolean
Whether the connector should overwrite existing values in the _id field when the strategy defined in document.id.strategy is applied.

Default: false
Accepted Values: true or false
document.id.strategy.uuid.format
string
Whether the connector should output the UUID in the _id field as a string or in the BsonBinary format.

Default: string
Accepted Values: string or binary
delete.on.null.values
boolean
Whether the connector should delete documents with matching key values when value is null. This setting applies when using an id generation strategy that operates on the key document such as FullKeyStrategy, PartialKeyStrategy, and ProvidedInKeyStrategy.

Default: false
Accepted Values: true or false
max.batch.size
int
The maximum number of sink records to batch together for processing.

Default: 0
Accepted Values: An integer
max.num.retries
int
How many retries should be attempted on write errors.

Default: 1
Accepted Values: An integer
retries.defer.timeout
int
How long (in milliseconds) a retry should get deferred.

Default: 5000
Accepted Values: An integer
change.data.capture.handler
string
The class name of the CDC handler to use for processing.

Default: ""
Accepted Values: An empty string or a fully qualified Java class name
field.renamer.mapping
string
An inline JSON array with objects describing field name mappings.
[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]
Default: []
Accepted Values: A valid JSON array
field.renamer.regexp
string
An inline JSON array containing regular expression statement objects.
[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]
Default: []
Accepted Values: A valid JSON array
key.projection.list
string
A list of field names to include in the key projection.

Default: ""
Accepted Values: A comma-separated list of field names
key.projection.type
string
The type of key projection to use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
post.processor.chain
list
A list of post-processor classes that process the data before saving it to MongoDB.

Default: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Accepted Values: A comma-separated list of fully qualified Java class names
rate.limiting.every.n
int
Number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting.

Default: 0
Accepted Values: An integer
rate.limiting.timeout
int
How long (in milliseconds) to wait before continuing to process data once the rate limit is reached.

Default: 0
Accepted Values: An integer
value.projection.list
string
A list of field names to include in the value projection.

Default: ""
Accepted Values: A comma-separated list of field names
value.projection.type
string
The type of value projection to use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
writemodel.strategy
string
The class that specifies the WriteModel to use for Bulk Writes. See Custom Write Model Strategy for more information.

Default: com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
Accepted Values: A fully qualified Java class name of a class that implements WriteModelStrategy
topic.override.<topicName>.<propertyName>
string
Per-topic configurations that override the corresponding global and default property settings. For example: topic.override.foo.collection=bar instructs the sink connector to store data from the foo topic in the bar collection. For additional examples, see Topic-Specific Configuration Settings.
Note

You can specify any configuration on a per-topic basis except for connection.uri and topics.

Default: ""
Accepted Values: Accepted values specific to the overridden property
tasks.max
int
The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot handle the specified level of parallelism.
Important
Messages May Be Processed Out of Order for Values Greater Than 1

If you specify a value greater than 1, the connector enables parallel processing of the tasks. If your topic has multiple partition logs (allows consumers to read from the topic in parallel), messages may be processed out of order.

Default: 1
Accepted Values: a positive integer.

The MongoDB Kafka Sink Connector can be configured to sink data from multiple topics. You can override global or default property settings with a topic-specific setting in the form of topic.override.<topicName>.<propertyName>.

Note

The topics and connection.uri properties are global and cannot be overridden.

The following example demonstrates specifying topic-specific settings.

topic.override.topicA.collection=collectionA
topic.override.topicA.max.batch.size=100
topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
topic.override.topicA.value.projection.type=BlockList
topic.override.topicA.value.projection.list=k2,k4

The sink connector topic override settings instruct the connector to apply the following behavior for data consumed from topicA:

  • Write documents to the MongoDB collection collectionA in batches of up to 100.
  • Generate a UUID to be stored in the _id field for each new document.
  • Omit fields k2 and k4 from the value projection using the BlockList projection type.

You can specify which namespace (database and/or collection) to sink a document based on its field values using the FieldPathNamespaceMapper. To enable this mapper, set the namespace.mapper configuration setting to the class name as shown below:

namespace.mapper=com.mongodb.kafka.connect.sink.topic.mapping.FieldPathNamespaceMapper

The FieldPathNamespaceMapper requires the following:

  • at least one of the mapping configurations to a database or collection is set
  • only one of the key or value mapping configurations is set for a mapping to a database
  • only one of the key or value mapping configurations is set for a mapping to a collection

You can use the following settings to customize the behavior of the FieldPathNamespaceMapper:

Name
Type
Description
namespace.mapper.key.database.field
string
The name of the key document field that specifies the name of the database to write to.
namespace.mapper.key.collection.field
string
The name of the key document field that specifies the name of the collection to write to.
namespace.mapper.value.database.field
string
The name of the value document field that specifies the name of the database to write to.
namespace.mapper.value.collection.field
string
The name of the value document field that specifies the name of the collection to write to.
namespace.mapper.error.if.invalid
boolean
Whether to throw an exception if the document is missing the mapped field or if it contains an invalid BSON type.
When set to true, the connector does not process the record and may halt or skip processing depending on the related settings.
When set to false and a document is missing the mapped field or if it contains an invalid BSON type, the connector defaults to writing to the specified database and collection settings.

Default: false
Accepted Values: true or false

Apache Kafka version 2.6 added support for handling errant records that cause problems when processing them. The Kafka connector automatically sends messages that it cannot process to the dead letter queue to allow you to inspect them manually, update them, and resubmit them for processing.

The following is an example configuration for enabling the dead letter queue topic example.deadletterqueue. It specifies for the invalid message as well as the log file to record invalid messages, and to include context headers in the dead letter queue messages.

errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=example.deadletterqueue
errors.deadletterqueue.context.headers.enable=true

The table below describes the configuration settings relevant to the dead letter queue.

Name
Type
Description
errors.tolerance
string
Whether to continue processing messages if an error is encountered. When set to none, the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all, the connector silently ignores any bad messages.

Default: "none"
Accepted Values: "none" or "all"
errors.log.enable
boolean
Whether details of failed operations should be written to the log file. When set to true, both errors that are tolerated (determined by the errors.tolerance setting) and not tolerated are written. When set to false, errors that are tolerated are omitted.

Default: false
Accepted Values: true or false
errors.log.include.messages
boolean
Whether to include the invalid message, including record keys, values, and headers, in the error log file.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.name
string
Name of topic to use as the dead letter queue. If blank, none of the invalid messages are written to the dead letter queue.

Default: ""
Accepted Values: A valid Kafka topic name
errors.deadletterqueue.context.headers.enable
boolean
Whether to provide the context headers in messages written to the dead letter queue.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.replication.factor
integer
The number of nodes on which to replicate the dead letter queue topic. If running a single-node Kafka cluster, this must be set to 1.

Default: 3
Accepted Values: A valid number

See the Confluent Sink Connector documentation for more information on these settings.

Give Feedback

On this page