Navigation

Kafka Sink Connector Post-Processors

Post Processing of Documents

Post processors are sink connector classes that modify data in the SinkDocument, a class that contains a BSON representation of the SinkRecord key and value fields, after it has been read from the Kafka topic. The connector applies a chain of post processors in which each post processor is executed in the order provided on the SinkDocument, and the result is stored in a MongoDB collection.

Post processors perform data modification tasks such as setting the document _id field, message key or value projection, renaming fields, and redacting sensitive information. You can implement your own post processor by extending the PostProcessor class or use one of the following pre-built ones:

Post Processor NameDescription
DocumentIdAdder
Full Path: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Uses a configured strategy to insert an _id field.
Bulb IconTip
See Also:
BlockListKeyProjector
Full Path: com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
Bulb IconTip
See Also:
BlockListValueProjector
Full Path: com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
Bulb IconTip
See Also:
AllowListKeyProjector
Full Path: com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
Bulb IconTip
See Also:
AllowListValueProjector
Full Path: com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
matching value fields from the sink record.
Bulb IconTip
See Also:
KafkaMetaAdder
Full Path: com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field composed of the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
Bulb IconTip
RenameByRegex
Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression.
Bulb IconTip

You can configure the post processor chain by specifying an ordered, comma separated list of fully-qualified PostProcessor class names:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Info With Circle IconCreated with Sketch.Note

The DocumentIdAdder post processor is automatically added to the first position in the chain if not already present.

This section explains the available configuration options for post processors included in the MongoDB Kafka Connector.

The DocumentIdAdder post processor provides the _id field for a SinkDocument before it is written to a MongoDB collection. This post processor is configured using a strategy that contains the logic for generating the value of the _id. The following strategies are provided with this connector:

Strategy NameDescription
BsonOidStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Default value for the DocumentIdAdder post processor.
Generates a MongoDB BSON ObjectId.
KafkaMetaDataStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses complete key structure of the SinkDocument.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the SinkDocument if it exists.
Throws an exception if the field is missing.
ProvidedInValueStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the SinkDocument if it exists.
Throws an exception if the field is missing.
PartialKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the key structure of the SinkDocument.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the value structure of the SinkDocument.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
Generates a random UUID as a string.

You can assign the document.id.strategy property as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

To define a custom strategy, create a class that implements the interface IdStrategy and provide the fully-qualified path to the document.id.strategy setting.

Info With Circle IconCreated with Sketch.Note
Selected strategy may have implications on delivery semantics

BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since new ids would be generated on retries or re-processing. Other strategies permit exactly-once delivery if the fields that form the document _id are guaranteed to be unique.

This section provides example projection configurations to show how they filter the following sample record:

{
"name": "Anonymous",
"age": 42,
"active": true,
"address": {
"city": "Unknown",
"country": "NoWhereLand"
},
"food": [
"Austrian",
"Italian"
],
"data": [
{
"k": "foo",
"v": 1
}
],
"lut": {
"key1": 12.34,
"key2": 23.45
},
"destination: { "city": "Springfield", "country": "AnotherLand" } }
Info With Circle IconCreated with Sketch.Note

The following example configurations contain [key|value] placeholder values that represent either key or value in order to avoid repetition. Specify the one appropriate to your use case when creating your configuration.

In the following example sink configuration, we specify a block list projection and the specific fields to omit from the record:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockList[Key|Value]Projector
[key|value].projection.type=BlockList
[key|value].projection.list=age,address.city,lut.key2,data.v
Info With Circle IconCreated with Sketch.Note

You can use the "." (dot) notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.

The record contains the following data after applying the projection:

{
"name": "Anonymous",
"active": true,
"address": {
"country": "NoWhereLand"
},
"food": [
"Austrian",
"Italian"
],
"data": [
{
"k": "foo"
}
],
"lut": {
"key1": 12.34
},
"destination: { "city": "Springfield", "country": "AnotherLand" } }

In the following example sink configuration, we specify an allow list projection and the specific fields to include in the record:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowList[Key|Value]Projector
[key|value].projection.type=AllowList
[key|value].projection.list=age,address.city,lut.key2,data.v
Info With Circle IconCreated with Sketch.Note

You can use the "." notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.

The record contains the following data after applying the projection:

{
"age": 42,
"address": {
"city": "Unknown"
},
"data": [
{
"v": 1
}
],
"lut": {
"key2": 23.45
}
}

The previous example projection configurations demonstrated exact string matching on field names. The projection list setting also supports the following wildcard patterns matching on field names:

  • "*" (star): matches a string of any length for the level in the document in which it is specified.
  • "**" (double star): matches the current and all nested levels from which it is specified.

The examples below demonstrate how to use each wildcard pattern and the projection output from the following sample record:

{
"forecast": [
{ "day": "Monday",
"temp": {
"degrees": 25.3,
"units": "C",
"range": {
"low": 23.0,
"high": 28.7
}
},
"uv": 5
}
],
"city": "Springfield",
"population: { "qty": 30.7, "scale": 1000, "units": "people" } }

Allow List Wildcard Examples

The * wildcard pattern in the example below matches all the keys named temp in the forecast array and all fields nested a single level below it.

[key|value].projection.type=AllowList
[key|value].projection.list=city,forecast.temp.*

The record contains the following data after applying the projection:

{
"city": "Springfield",
"forecast": [
{
"temp": {
"degrees": 25.3,
"units": "C",
"range": {
"low": 23.0,
"high": 28.7
}
}
}
]
}

The ** wildcard pattern in the example below matches all the keys for all levels that contain the field scale.

[key|value].projection.type=AllowList
[key|value].projection.list=**.scale

The record contains the following data after applying the projection:

{
"population: { "qty": 30.7, "scale": 1000, "units": "people" } }

Block List Wildcard Examples

The wildcard character can also be used to match all field names at specific levels as demonstrated in the following block list projection configuration example:

[key|value].projection.type=BlockList
[key|value].projection.list=population,forecast.*.*

The record contains the following data after applying the projection:

{
"forecast": [
{
"day": "Monday",
"uv": 5
}
],
"city": "Springfield",
}

The connector configuration also supports the ** (double star) wildcard which matches the current and all nested levels from which it is specified.

[key|value].projection.type=BlockList
[key|value].projection.list=**.high
{
"city": "Springfield",
"population: {
"qty": 30.7,
"scale": 1000,
"units": "people"
}
}

Field Renaming Post Processors

This section provides example configurations for the RenameByMapping and RenameByRegex post processors to show how they update field names in a sink record. The field renaming parameters specify whether to update the key or value document in the record using dot notation as well as the pattern to match. You must specify the RenameByMapping and RenameByRegex properties with your parameters in a JSON array.

The field renaming post processor examples use the following sample sink record:

Sample Key Document

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Sample Value Document

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

The RenameByMapping post processor setting specifies one or more objects that assigns fields matching a string to a new name in a Key or Value document.

Each object contains the text to match in the oldName element and the replacement text in the newName element as described in the table below.

Key NameDescription
oldNameSpecifies whether to match a key or value document and an appended string that matches the field to replace.
newNameContains the replacement text for all matches of the field defined in the oldName field.

The following example property matches the "location" field of a Key document and renames it to "country":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

This transforms the original key document to the following one after applying the RenameByMapping post processor:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

You can perform a similar field name assignment for value documents by specifying value with the appended field name in the oldName field as follows:

field.renamer.mapping=[{"oldName":"value.location", "newName":"city"}]

This transforms the original value document to the following one after applying the RenameByMapping post processor:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

You can also specify multiple mappings in the field.renamer.mapping property by using a stringified JSON array as shown in the following example:

field.renamer.mapping="[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]"

The RenameByRegex post processor setting is an array of objects. Each object in the array contains the following JSON element keys:

Key NameDescription
regexpContains a regular expression that matches fields to perform the replacement.
patternContains a regular expression that matches on the text to replace.
newNameContains the replacement text for all matches of the regular expression defined in the pattern field.

Example

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"},{"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

When the post processor is applied to the sample key document and the sample value document, it produces the following output:

Key Document

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Value Document

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

The post processor applied the following changes:

  • All field names in the key document of the sink record that started with "date" are matched. In the matched fields, all instances of "_" are replaced with "-".
  • All field names in the value document of the sink record that are subdocuments of crepes are matched. In the matched fields, all instances of "purchased" are replaced with "quantity".
Bulb IconTip
Ensure renaming does not result in duplicate keys in the same document

The renaming post processors update the key fields of a JSON document which can result in duplicate keys within a document. They skip the renaming step if the replacement key already exists at the current level.

A write model defines the behavior of bulk write operations made on a MongoDB collection. The default write model for the connector is ReplaceOneModel with ReplaceOptions set to upsert mode.

You can override the default write model by specifying a custom one in the mongodb.writemodel.strategy configuration setting. The following strategies are provided with the connector:

Write ModelDescription
ReplaceOneDefaultStrategy
Replaces at most one document that matches the current document by the _id field.
Default value for writemodel.strategy configuration setting.
ReplaceOneBusinessKeyStrategy
Replaces at most one document that matches filters provided by the document.id.strategy setting.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
Bulb IconTip
See Also:
DeleteOneDefaultStrategy
Deletes at most one document that matches the id specified by the document.id.strategy setting, only when the document contains a null value record.
Implicitly specified when the configuration setting mongodb.delete.on.null.values=true is set.
You can set this explicitly with the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy
UpdateOneTimestampsStrategy
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
Bulb IconTip
UpdateOneBusinessKeyTimestampStrategy
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents that match the filters provided by the document.id.strategy setting.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy

If none of the pre-built write strategy models suits your needs, you can create your own. A custom write strategy model is a Java class that implements WriteModelStrategy.

To configure your Sink Connector to use the custom WriteModelStrategy, follow the steps below:

  1. Create a class that implements the WriteModelStrategy interface and overrides the createWriteModel(SinkDocument) method.
  2. Compile the class to a .class file or JAR.
  3. Add the class to the Class Path / Plugin Path for Kafka workers. For more information about plugin paths, see the Confluent documentation.
  4. Specify your Java class in the mongodb.writemodel.strategy configuration setting.

The following is an example of a custom write strategy. It extracts the value of fullDocument from the value document and returns a ReplaceOne operation. The ReplaceOne operation replaces a document in the MongoDB collection that matches the ID from the value document.

/** * A custom write model strategy * * This example takes the 'fullDocument' field from a change stream and creates a * ReplaceOne operation. */
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

A business key is a value comprised of data within the sink record that identifies it as a unique document. This example defines a business key using data contained within multiple fields in the record as well as instructs the post processor to generate BSON ObjectIds for inserts, but not for updates.

Follow the steps below to configure this strategy:

  1. Create a unique index that corresponds to your business key in your target MongoDB collection.
  2. In the connector configuration, specify the PartialValueStrategy as the id strategy to identify the fields that belong to the business key.
  3. In the connector configuration, specify the ReplaceOneBusinessKeyStrategy writemodel strategy.

For this example, we track airplane capacity by the flight number and airport location represented by flight_no and airport_code, respectively. An example message contains the following:

{
"flight_no": "Z342",
"airport_code": "LAX",
"passengers": {
"capacity": 180,
"occupied": 152
}
}

To implement the strategy, we first create a unique index on the flight_no and airport_code fields in the MongoDB shell:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

Next, we specify the PartialValueStrategy strategy and fields to include in the business key and specify the ReplaceOneBusinessKeyStrategy writemodel strategy in the configuration file:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"

The sample data inserted into the collection contains the following:

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"flight_no": "Z342"
"airport_code": "LAX",
"passengers_spaces": {
"capacity": 180,
"occupied": 152
}
}

When the connector processes sink data that matches an existing document with the same business key field values, it updates the document with the new values without changing the _id field.

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"flight_no": "Z342"
"airport_code": "LAX",
"passengers_spaces": {
"capacity": 180,
"occupied": 95
}
}

This example shows how we can track the create and update timestamps on documents that are upserted by the connector. The UpdateOneTimestampsStrategy custom write model strategy performs the following tasks:

  • When a new MongoDB document is inserted into the collection by the connector, the _insertedTS and _modifiedTS fields will be set to the current time.
  • When an existing MongoDB document is updated in the collection by the connector, the _modifiedTS field will be updated to the current time.

This is set by specifying the UpdateOneTimestampsStrategy in the configuration file as follows:

writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

For this example, we want to track the position of a train along its route. The _modifiedTS field provides us with the time and date that the first position report was saved into the collection.

The position report includes the following data in the value document:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40.8051693, -73.9388079 ]
}

The writemodel strategy is set to UpdateOneTimestampsStrategy to append the created and modified timestamps and the document id strategy is set to ProvidedInValueStrategy to identify the train using the _id field of the value document.

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

The MongoDB document inserted after processing the initial message for the train includes the following data:

{
"_id": "MN-1234",
"_insertedTS": ISODate('2019-10-23T15:08:000Z"), "_modifiedTS": ISODate("2019-10-23T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40.805, -73.939 ] }

After an hour, the train reports its current location along its route. The position and _modifiedTS fields are updated:

{
"_id": "MN-1234",
"_insertedTS": ISODate('2019-10-23T15:08:000Z"), "_modifiedTS": ISODate("2019-10-23T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 41.156, -73.870 ] }
Give Feedback