Kafka Sink Connector Configuration Properties¶
Sink Connector Configuration Properties¶
This section lists the available configuration settings used to compose a properties file for the MongoDB Kafka Sink Connector. The connector uses these settings to determine which topics to consume data from and what data to sink to MongoDB. For an example configuration file, see MongoSinkConnector.properties.
Name | Type | Description | |
---|---|---|---|
topics | list | A list of Kafka topics that the sink connector watches. Required Note: You can only define either topics or topics.regex .Accepted Values: A comma-separated list of valid Kafka topics | |
topics.regex | string | A regular expression that matches the Kafka topics that the sink connector should watch. The following regex matches topics such as "activity.landing.clicks" and "activity.support.clicks", but not "activity.landing.views" or "activity.clicks":
Required Note: You can only define either topics or topics.regex .Accepted Values: A valid regular expression match pattern using java.util.regex.Pattern . | |
connection.uri | string |
For additional information, see the Connect your Kafka Connector to MongoDB section. Important Avoid Exposing Your Authentication Credentials To avoid exposing your authentication credentials in your
Default: mongodb://localhost:27017 Accepted Values: A valid MongoDB connection URI string | |
database | string | The name of the MongoDB database the sink writes to. Required Accepted Values: A valid MongoDB database name | |
collection | string | Single sink MongoDB collection name to write to. If the sink follows multiple topics, this is the default collection they are mapped to. Required Accepted Values: A MongoDB collection name | |
namespace.mapper | string | The class name of the class that specifies which database or collection in which to sink the data. The DefaultNamespaceMapper uses the database and collection settings.Tip See also: Default: com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper Accepted Values: A fully qualified Java class name of a class that implements the NamespaceMapper interface. | |
document.id.strategy | string | The class name of the class that generates a unique document _id field.Default: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy Accepted Values: An empty string or a fully qualified Java class name | |
document.id.strategy.overwrite.existing | boolean | Whether the connector should overwrite existing values in the _id field when the strategy defined in document.id.strategy is applied.Default: false Accepted Values: true or false | |
document.id.strategy.uuid.format | string | Whether the connector should output the UUID in the _id field as a string or in the BsonBinary format.Default: string Accepted Values: string or binary | |
delete.on.null.values | boolean | Whether the connector should delete documents with matching key values when value is null. This setting applies when using an id generation strategy that operates on the key document such as FullKeyStrategy , PartialKeyStrategy , and ProvidedInKeyStrategy .Default: false Accepted Values: true or false | |
max.batch.size | int | The maximum number of sink records to batch together for processing. Default: 0 Accepted Values: An integer | |
max.num.retries | int | How many retries should be attempted on write errors. Default: 1 Accepted Values: An integer | |
retries.defer.timeout | int | How long (in milliseconds) a retry should get deferred. Default: 5000 Accepted Values: An integer | |
change.data.capture.handler | string | The class name of the CDC handler to use for processing. Default: "" Accepted Values: An empty string or a fully qualified Java class name | |
field.renamer.mapping | string | An inline JSON array with objects describing field name mappings.
Default: [] Accepted Values: A valid JSON array | |
field.renamer.regexp | string | An inline JSON array containing regular expression statement objects.
Default: [] Accepted Values: A valid JSON array | |
key.projection.list | string | A list of field names to include in the key projection. Default: "" Accepted Values: A comma-separated list of field names | |
key.projection.type | string | The type of key projection to use. Default: none Accepted Values: none , BlockList , or AllowList (Deprecated: blacklist, whitelist) | |
post.processor.chain | list | A list of post-processor classes that process the data before saving it to MongoDB. Default: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder Accepted Values: A comma-separated list of fully qualified Java class names | |
rate.limiting.every.n | int | Number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting. Default: 0 Accepted Values: An integer | |
rate.limiting.timeout | int | How long (in milliseconds) to wait before continuing to process data once the rate limit is reached. Default: 0 Accepted Values: An integer | |
value.projection.list | string | A list of field names to include in the value projection. Default: "" Accepted Values: A comma-separated list of field names | |
value.projection.type | string | The type of value projection to use. Default: none Accepted Values: none , BlockList , or AllowList (Deprecated: blacklist, whitelist) | |
writemodel.strategy | string | The class that specifies the WriteModel to use for Bulk Writes. See Custom Write Model Strategy for more information.Default: com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy Accepted Values: A fully qualified Java class name of a class that implements WriteModelStrategy | |
topic.override.<topicName>.<propertyName> | string | Per-topic configurations that override the corresponding global and default property settings. For example: topic.override.foo.collection=bar instructs the sink connector to store data from the foo topic in the bar collection. For additional examples, see Topic-Specific Configuration Settings.Note You can specify any configuration on a per-topic basis except
for Default: "" Accepted Values: Accepted values specific to the overridden property | |
tasks.max | int | The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot handle the specified level of parallelism. Important Messages May Be Processed Out of Order for Values Greater Than 1 If you specify a value greater than Default: 1 Accepted Values: a positive integer. | |
heartbeat.interval.ms | int | The length of time in milliseconds between sending heartbeat messages. Heartbeat messages contain the post batch resume token and are sent when no source records have been published in the specified interval. This improves the resumability of the connector for low volume namespaces. Use 0 to disable.Default: 0 Accepted Values: An integer | |
heartbeat.topic.name | string | The name of the topic to publish heartbeat messages to. To enable the heartbeat feature, you must provide a positive value in the heartbeat.interval.ms setting. The user must consume messages on this topic to track the latest offset (post batch resume token).Default: __mongodb_heartbeats Accepted Values: A valid Kafka topic name |
Topic-Specific Configuration Settings¶
The MongoDB Kafka Sink Connector can be configured to sink data from
multiple topics. You can override global or default property settings with
a topic-specific setting in the form of
topic.override.<topicName>.<propertyName>
.
The topics
and connection.uri
properties are global and
cannot be overridden.
The following example demonstrates specifying topic-specific settings.
Example: Override Connector Sink Settings on TopicA¶
topic.override.topicA.collection=collectionA topic.override.topicA.max.batch.size=100 topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListValueProjector topic.override.topicA.value.projection.type=BlockList topic.override.topicA.value.projection.list=k2,k4
The sink connector topic override settings instruct the connector to apply
the following behavior for data consumed from topicA
:
- Write documents to the MongoDB collection
collectionA
in batches of up to 100. - Generate a UUID to be stored in the
_id
field for each new document. - Omit fields
k2
andk4
from the value projection using theBlockList
projection type.
FieldPathNamespaceMapper Settings¶
You can specify which namespace (database and/or collection) to sink
a document based on its field values using the
FieldPathNamespaceMapper
. To enable this mapper, set the
namespace.mapper
configuration setting to the class name as shown
below:
namespace.mapper=com.mongodb.kafka.connect.sink.topic.mapping.FieldPathNamespaceMapper
The FieldPathNamespaceMapper
requires the following:
- at least one of the mapping configurations to a database or collection is set
- only one of the
key
orvalue
mapping configurations is set for a mapping to a database - only one of the
key
orvalue
mapping configurations is set for a mapping to a collection
You can use the following settings to customize the behavior of the
FieldPathNamespaceMapper
:
Name | Type | Description |
---|---|---|
namespace.mapper.key.database.field | string | The name of the key document field that specifies the name of the database to write to. |
namespace.mapper.key.collection.field | string | The name of the key document field that specifies the name of the collection to write to. |
namespace.mapper.value.database.field | string | The name of the value document field that specifies the name of the database to write to. |
namespace.mapper.value.collection.field | string | The name of the value document field that specifies the name of the collection to write to. |
namespace.mapper.error.if.invalid | boolean | Whether to throw an exception if the document is missing the mapped field or if it contains an invalid BSON type. When set to true , the connector does not process the record and may halt or skip processing depending on the related settings.When set to false and a document is missing the mapped field or if it contains an invalid BSON type, the connector defaults to writing to the specified database and collection settings.Default: false Accepted Values: true or false |
Dead Letter Queue Configuration Settings¶
Kafka connectors send messages that cannot be processed to the dead letter queue. The connector sends invalid messages to this queue in order to allow manual inspection, updates, and re-submission for processing. It is disabled by default and requires restarting the connector for the changes to the configuration to take effect.
The following is an example configuration for enabling the dead letter queue
topic example.deadletterqueue
that specifies that both the invalid
message as well as the log file should record invalid messages, and that
context headers should be included in the dead letter queue messages.
errors.tolerance=all errors.log.enable=true errors.log.include.messages=true errors.deadletterqueue.topic.name=example.deadletterqueue errors.deadletterqueue.context.headers.enable=true
The table below describes the configuration settings relevant to the dead letter queue.
Name | Type | Description |
---|---|---|
errors.tolerance | string | Whether to continue processing messages if an error is encountered. When set to none , the connector reports an error and blocks further processing of the rest of the records when it encounters an error. When set to all , the connector silently ignores any bad messages.Default: "none" Accepted Values: "none" or "all" |
errors.log.enable | boolean | Whether details of failed operations should be written to the log file. When set to true, both errors that are tolerated (determined by the errors.tolerance setting) and not tolerated are written. When set to false, errors that are tolerated are omitted. Default: false Accepted Values: true or false |
errors.log.include.messages | boolean | Whether to include the invalid message, including record keys, values, and headers, in the error log file. Default: false Accepted Values: true or false |
errors.deadletterqueue.topic.name | string | Name of topic to use as the dead letter queue. If blank, none of the invalid messages are written to the dead letter queue. Default: "" Accepted Values: A valid Kafka topic name |
errors.deadletterqueue.context.headers.enable | boolean | Whether to provide the context headers in messages written to the dead letter queue. Default: false Accepted Values: true or false |
errors.deadletterqueue.topic.replication.factor | integer | The number of nodes on which to replicate the dead letter queue topic. If running a single-node Kafka cluster, this must be set to 1. Default: 3 Accepted Values: A valid number |
See the Confluent Sink Connector documentation for more information on these settings.