Navigation

Kafka Sink Connector Configuration Properties

On this page

  • Sink Connector Configuration Properties
  • Time-Series Configuration Properties
  • Topic-Specific Configuration Settings
  • Example: Override Connector Sink Settings on TopicA
  • FieldPathNamespaceMapper Settings
  • Dead Letter Queue Configuration Settings

This section lists the available configuration settings used to compose a properties file for the MongoDB Kafka Sink Connector. The connector uses these settings to determine which topics to consume data from and what data to sink to MongoDB. For an example configuration file, see MongoSinkConnector.properties.

Settings
topics

Type: list
Description:
A list of Kafka topics that the sink connector watches.

Required
Note: You can only define either topics or topics.regex.
Accepted Values: A comma-separated list of valid Kafka topics
topics.regex

Type: string
Description:
A regular expression that matches the Kafka topics that the sink connector should watch.
The following regex matches topics such as "activity.landing.clicks" and "activity.support.clicks", but not "activity.landing.views" or "activity.clicks":
topics.regex=activity\\.\\w+\\.clicks$
Required
Note: You can only define either topics or topics.regex.
Accepted Values: A valid regular expression match pattern using java.util.regex.Pattern.
connection.uri

Type: string
Description:
mongodb://username:password@localhost/
For additional information, see the Connect your Kafka Connector to MongoDB section.
Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017
Accepted Values: A valid MongoDB connection URI string
database

Type: string
Description:
The name of the MongoDB database the sink writes to.

Required
Accepted Values: A valid MongoDB database name
collection

Type: string
Description:
Single sink MongoDB collection name to write to. If the sink follows multiple topics, this is the default collection they are mapped to.

Required
Accepted Values: A MongoDB collection name
server.api.version

Type: string
Description:
The API version you want to use with your MongoDB instance. For more information on the Versioned API, see the MongoDB manual entry on the Versioned API.

Default: ""
Accepted Values: An empty string or a valid server version. At this time, the only valid server version is "1".
server.api.deprecationErrors

Type: boolean
Description:
When set to true, calling a command on your MongoDB instance that is deprecated in the declared API version causes the connector to raise an exception. You can set the API version with the server.api.version configuration option. For more information on the Versioned API, see the MongoDB manual entry on the Versioned API.

Default: false
Accepted Values: true or false
server.api.strict

Type: boolean
Description:
When set to true, calling a command on your MongoDB instance that is not part of the declared API version causes the connector to raise an exception. You can set the API version with the server.api.version configuration option. For more information on the Versioned API, see the MongoDB manual entry on the Versioned API.

Default: false
Accepted Values: true or false
namespace.mapper

Type: string
Description:
The class name of the class that specifies which database or collection in which to sink the data. The DefaultNamespaceMapper uses the database and collection settings.
Tip
Default: com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
Accepted Values: A fully qualified Java class name of a class that implements the NamespaceMapper interface.
document.id.strategy

Type: string
Description:
The class name of the class that generates a unique document _id field.

Default: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Accepted Values: An empty string or a fully qualified Java class name
document.id.strategy.overwrite.existing

Type: boolean
Description:
Whether the connector should overwrite existing values in the _id field when the strategy defined in document.id.strategy is applied.

Default: false
Accepted Values: true or false
document.id.strategy.uuid.format

Type: string
Description:
Whether the connector should output the UUID in the _id field as a string or in the BsonBinary format.

Default: string
Accepted Values: string or binary
delete.on.null.values

Type: boolean
Description:
Whether the connector should delete documents with matching key values when value is null. This setting applies when using an id generation strategy that operates on the key document such as FullKeyStrategy, PartialKeyStrategy, and ProvidedInKeyStrategy.

Default: false
Accepted Values: true or false
max.batch.size

Type: int
Description:
The maximum number of sink records to batch together for processing.

Default: 0
Accepted Values: An integer
max.num.retries

Type: int
Description:
How many retries should be attempted on write errors.

Default: 1
Accepted Values: An integer
retries.defer.timeout

Type: int
Description:
How long (in milliseconds) a retry should get deferred.

Default: 5000
Accepted Values: An integer
change.data.capture.handler

Type: string
Description:
The class name of the CDC handler to use for processing.

Default: ""
Accepted Values: An empty string or a fully qualified Java class name
field.renamer.mapping

Type: string
Description:
An inline JSON array with objects describing field name mappings.
[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]
Default: []
Accepted Values: A valid JSON array
field.renamer.regexp

Type: string
Description:
An inline JSON array containing regular expression statement objects.
[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]
Default: []
Accepted Values: A valid JSON array
key.projection.list

Type: string
Description:
A list of field names to include in the key projection.

Default: ""
Accepted Values: A comma-separated list of field names
key.projection.type

Type: string
Description:
The type of key projection to use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
post.processor.chain | | Type: list | Description: | A list of post-processor classes that process the data before saving it to MongoDB. | | Default: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder | Accepted Values: A comma-separated list of fully qualified Java class names
rate.limiting.every.n

Type: int
Description:
Number of processed batches that should trigger the rate limit. A value of 0 means no rate limiting.

Default: 0
Accepted Values: An integer
rate.limiting.timeout

Type: int
Description:
How long (in milliseconds) to wait before continuing to process data once the rate limit is reached.

Default: 0
Accepted Values: An integer
value.projection.list

Type: string
Description:
A list of field names to include in the value projection.

Default: ""
Accepted Values: A comma-separated list of field names
value.projection.type

Type: string
Description:
The type of value projection to use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
writemodel.strategy

Type: string
Description:
The class that specifies the WriteModel to use for Bulk Writes. See Custom Write Model Strategy for more information.

Default: com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy
Accepted Values: A fully qualified Java class name of a class that implements WriteModelStrategy
topic.override.<topicName>.<propertyName>

Type: string
Description:
Per-topic configurations that override the corresponding global and default property settings. For example: topic.override.foo.collection=bar instructs the sink connector to store data from the foo topic in the bar collection. For additional examples, see Topic-Specific Configuration Settings.
Note

You can specify any configuration on a per-topic basis except for connection.uri and topics.

Default: ""
Accepted Values: Accepted values specific to the overridden property
tasks.max

Type: int
Description:
The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot handle the specified level of parallelism.
Important
Messages May Be Processed Out of Order for Values Greater Than 1

If you specify a value greater than 1, the connector enables parallel processing of the tasks. If your topic has multiple partition logs (allows consumers to read from the topic in parallel), messages may be processed out of order.

Default: 1
Accepted Values: a positive integer.

You can configure the MongoDB Kafka Sink Connector to sink data from Kafka topics into a time-series collection.

For more information about time-series collections, see our server manual entry.

You can use the following settings to configure a time-series collection with the MongoDB Kafka Sink Connector:

Settings
timeseries.timefield

Type: string
Description:
The name of the top level field used for time.

Default: ""
Accepted Values: An empty string or the name of a field containing a BSON DateTime type
timeseries.timefield.auto.convert.date.format

Type: string
Description:
The string pattern to convert the source data from. The setting expects the string representation to contain both date and time information and uses the Java DateTimeFormatter.ofPattern(pattern, locale) API for the conversion. If the string only contains date information, then the time since epoch is from the start of that day. If a string representation does not contain time-zone offset, then the setting interprets the extracted date and time as UTC.

Optional
Default: yyyy-MM-dd[['T'][ ]][HH:mm:ss[[.][SSSSSS][SSS]][ ]VV[ ]'['VV']'][HH:mm:ss[[.][SSSSSS][SSS]][ ]X][HH:mm:ss[[.][SSSSSS][SSS]]]
Accepted Values: A valid DateTimeFormatter format
timeseries.timefield.auto.convert

Type: boolean
Description:
Whether to convert the data in the field into a BSON Date format. When set, the setting uses the milliseconds from epoch and discards fractional parts if the value is a number and the timeseries.timefield.auto.convert.date.format configuration to parse the date if the value is a string. No change occurs to the value if there is a failure when automatically converting.

Optional
Default: false
Accepted Values: true or false
timeseries.timefield.auto.convert.locale.language.tag

Type: string
Description:
The DateTimeFormatter locale language tag to use with the date pattern (e.g. 'en-US','en-GB','de','fr-CA' etc). See Language tags in HTML and XML for more information on constructing tags.

Optional
Default: ROOT
Accepted Values: A valid Locale language tag format
timeseries.metafield

Type: string
Description:
The name of the top-level field describing the series.

Optional
Note: This field groups related data. The metafield may not be the same as the timeField or _id.
Default: ""
Accepted Values: An empty string or the name of a field containing any BSON type, except for a BsonArray
timeseries.expire.after.seconds

Type: int
Description:
The amount of seconds the data remains in MongoDB before MongoDB deletes it.

Optional
Default: 0
Accepted Values: A nonnegative integer
timeseries.granularity

Type: string
Description:
The expected interval between subsequent measurements for a time-series.

Optional
Default: ""
Accepted Values: "", "seconds", "minutes", "hours"

For an example on how to convert an existing collection to a time-series collection, see the Time-Series Collection Example page.

The MongoDB Kafka Sink Connector can be configured to sink data from multiple topics. You can override global or default property settings with a topic-specific setting in the form of topic.override.<topicName>.<propertyName>.

Note

The topics and connection.uri properties are global and cannot be overridden.

The following example demonstrates specifying topic-specific settings.

topic.override.topicA.collection=collectionA
topic.override.topicA.max.batch.size=100
topic.override.topicA.document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
topic.override.topicA.post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
topic.override.topicA.value.projection.type=BlockList
topic.override.topicA.value.projection.list=k2,k4

The sink connector topic override settings instruct the connector to apply the following behavior for data consumed from topicA:

  • Write documents to the MongoDB collection collectionA in batches of up to 100.
  • Generate a UUID to be stored in the _id field for each new document.
  • Omit fields k2 and k4 from the value projection using the BlockList projection type.

You can specify which namespace (database and/or collection) to sink a document based on its field values using the FieldPathNamespaceMapper. To enable this mapper, set the namespace.mapper configuration setting to the class name as shown below:

namespace.mapper=com.mongodb.kafka.connect.sink.topic.mapping.FieldPathNamespaceMapper

The FieldPathNamespaceMapper requires the following:

  • at least one of the mapping configurations to a database or collection is set
  • only one of the key or value mapping configurations is set for a mapping to a database
  • only one of the key or value mapping configurations is set for a mapping to a collection

You can use the following settings to customize the behavior of the FieldPathNamespaceMapper:

Settings
namespace.mapper.key.database.field

Type: string
Description:
The name of the key document field that specifies the name of the database to write to.
namespace.mapper.key.collection.field

Type: string
Description:
The name of the key document field that specifies the name of the collection to write to.
namespace.mapper.value.database.field

Type: string
Description:
The name of the value document field that specifies the name of the database to write to.
namespace.mapper.value.collection.field

Type: string
Description:
The name of the value document field that specifies the name of the collection to write to.
namespace.mapper.error.if.invalid

Type: boolean
Description:
Whether to throw an exception if the document is missing the mapped field or if it contains an invalid BSON type.
When set to true, the connector does not process the record and may halt or skip processing depending on the related settings.
When set to false and a document is missing the mapped field or if it contains an invalid BSON type, the connector defaults to writing to the specified database and collection settings.

Default: false
Accepted Values: true or false

Apache Kafka version 2.6 added support for handling errant records that cause problems when processing them. The Kafka connector automatically sends messages that it cannot process to the dead letter queue to allow you to inspect them manually, update them, and resubmit them for processing.

The following is an example configuration for enabling the dead letter queue topic example.deadletterqueue. It specifies for the invalid message as well as the log file to record invalid messages, and to include context headers in the dead letter queue messages.

mongo.errors.tolerance=all
mongo.errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=example.deadletterqueue
errors.deadletterqueue.context.headers.enable=true

The table below describes the configuration settings relevant to the dead letter queue.

Settings
mongo.errors.tolerance

Type: string
Description:
Whether to continue processing messages if the connector encounters an error. When set to none, the connector reports any error and blocks further processing of the rest of the messages. When set to all, the connector silently ignores any bad messages.

This property overrides the errors.tolerance property of the Connect Framework.

Default: "none"
Accepted Values: "none" or "all"
mongo.errors.log.enable

Type: boolean
Description:
Whether details of failed operations should write to the log file. When set to true, the connector writes the errors tolerated (determined by the errors.tolerance or mongo.errors.tolerance setting) and not tolerated. When set to false, the connector only writes the errors that are not tolerated.

This property overrides the errors.log.enable property of the Connect Framework.

Default: false
Accepted Values: true or false
errors.log.include.messages

Type: boolean
Description:
Whether to include the invalid message, including record keys, values, and headers, in the error log file.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.name

Type: string
Description:
Name of topic to use as the dead letter queue. If blank, none of the invalid messages are written to the dead letter queue.

Default: ""
Accepted Values: A valid Kafka topic name
errors.deadletterqueue.context.headers.enable

Type: boolean
Description:
Whether to provide the context headers in messages written to the dead letter queue.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.replication.factor

Type: integer
Description:
The number of nodes on which to replicate the dead letter queue topic. If running a single-node Kafka cluster, this must be set to 1.

Default: 3
Accepted Values: A valid number

See the Confluent Sink Connector documentation for more information on these settings.

Give Feedback

On this page

  • Sink Connector Configuration Properties
  • Time-Series Configuration Properties
  • Topic-Specific Configuration Settings
  • Example: Override Connector Sink Settings on TopicA
  • FieldPathNamespaceMapper Settings
  • Dead Letter Queue Configuration Settings