Navigation

Kafka Sink Connector Post-Processors

On this page

Post Processing of Documents

Post processors are sink connector classes that modify data in the SinkDocument, a class that contains a BSON representation of the SinkRecord key and value fields, after it has been read from the Kafka topic. The connector applies a chain of post processors in which each post processor is executed in the order provided on the SinkDocument, and the result is stored in a MongoDB collection.

Post processors perform data modification tasks such as setting the document _id field, message key or value projection, renaming fields, and redacting sensitive information. You can implement your own post processor by extending the PostProcessor class or use one of the following pre-built ones:

Post Processors
DocumentIdAdder

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Uses a configured strategy to insert an _id field.
Tip
See also:
BlockListKeyProjector

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
Tip
See also:
BlockListValueProjector

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
Tip
See also:
AllowListKeyProjector

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
Tip
See also:
AllowListValueProjector

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
matching value fields from the sink record.
Tip
See also:
KafkaMetaAdder

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field composed of the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
RenameByRegex

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression.

You can configure the post processor chain by specifying an ordered, comma separated list of fully-qualified PostProcessor class names:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Note

The DocumentIdAdder post processor is automatically added to the first position in the chain if not already present.

This section explains the available configuration options for post processors included in the MongoDB Kafka Connector.

The DocumentIdAdder post processor provides the _id field for a SinkDocument before it is written to a MongoDB collection. This post processor is configured using a strategy that contains the logic for generating the value of the _id. The following strategies are provided with this connector:

Strategy
BsonOidStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Default value for the DocumentIdAdder post processor.
Generates a MongoDB BSON ObjectId.
KafkaMetaDataStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses complete key structure of the SinkDocument.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the SinkDocument if it exists.
Throws an exception if the field is missing.
ProvidedInValueStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the SinkDocument if it exists.
Throws an exception if the field is missing.
PartialKeyStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the key structure of the SinkDocument.
Defaults to a blank document if no key exists.
PartialValueStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the value structure of the SinkDocument.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy

Description:
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
Generates a random UUID as a string.

You can assign the document.id.strategy property as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

To define a custom strategy, create a class that implements the interface IdStrategy and provide the fully-qualified path to the document.id.strategy setting.

Note
Selected strategy may have implications on delivery semantics

BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since new ids would be generated on retries or re-processing. Other strategies permit exactly-once delivery if the fields that form the document _id are guaranteed to be unique.

This section provides example projection configurations to show how they filter the following sample record:

{
"name": "Anonymous",
"age": 42,
"active": true,
"address": {
"city": "Unknown",
"country": "NoWhereLand"
},
"food": [
"Austrian",
"Italian"
],
"data": [
{
"k": "foo",
"v": 1
}
],
"lut": {
"key1": 12.34,
"key2": 23.45
},
"destination: {
"city": "Springfield",
"country": "AnotherLand"
}
}
Note

The following example configurations contain [key|value] placeholder values that represent either key or value in order to avoid repetition. Specify the one appropriate to your use case when creating your configuration.

In the following example sink configuration, we specify a block list projection and the specific fields to omit from the record:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockList[Key|Value]Projector
[key|value].projection.type=BlockList
[key|value].projection.list=age,address.city,lut.key2,data.v
Note

You can use the "." (dot) notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.

The record contains the following data after applying the projection:

{
"name": "Anonymous",
"active": true,
"address": {
"country": "NoWhereLand"
},
"food": [
"Austrian",
"Italian"
],
"data": [
{
"k": "foo"
}
],
"lut": {
"key1": 12.34
},
"destination: {
"city": "Springfield",
"country": "AnotherLand"
}
}

In the following example sink configuration, we specify an allow list projection and the specific fields to include in the record:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowList[Key|Value]Projector
[key|value].projection.type=AllowList
[key|value].projection.list=age,address.city,lut.key2,data.v
Note

You can use the "." notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.

The record contains the following data after applying the projection:

{
"age": 42,
"address": {
"city": "Unknown"
},
"data": [
{
"v": 1
}
],
"lut": {
"key2": 23.45
}
}

The previous example projection configurations demonstrated exact string matching on field names. The projection list setting also supports the following wildcard patterns matching on field names:

  • "*" (star): matches a string of any length for the level in the document in which it is specified.
  • "**" (double star): matches the current and all nested levels from which it is specified.

The examples below demonstrate how to use each wildcard pattern and the projection output from the following sample record:

{
"forecast": [
{ "day": "Monday",
"temp": {
"degrees": 25.3,
"units": "C",
"range": {
"low": 23.0,
"high": 28.7
}
},
"uv": 5
}
],
"city": "Springfield",
"population: {
"qty": 30.7,
"scale": 1000,
"units": "people"
}
}

Allow List Wildcard Examples

The * wildcard pattern in the example below matches all the keys named temp in the forecast array and all fields nested a single level below it.

[key|value].projection.type=AllowList
[key|value].projection.list=city,forecast.temp.*

The record contains the following data after applying the projection:

{
"city": "Springfield",
"forecast": [
{
"temp": {
"degrees": 25.3,
"units": "C",
"range": {
"low": 23.0,
"high": 28.7
}
}
}
]
}

The ** wildcard pattern in the example below matches all the keys for all levels that contain the field scale.

[key|value].projection.type=AllowList
[key|value].projection.list=**.scale

The record contains the following data after applying the projection:

{
"population: {
"qty": 30.7,
"scale": 1000,
"units": "people"
}
}

Block List Wildcard Examples

The wildcard character can also be used to match all field names at specific levels as demonstrated in the following block list projection configuration example:

[key|value].projection.type=BlockList
[key|value].projection.list=population,forecast.*.*

The record contains the following data after applying the projection:

{
"forecast": [
{
"day": "Monday",
"uv": 5
}
],
"city": "Springfield",
}

The connector configuration also supports the ** (double star) wildcard which matches the current and all nested levels from which it is specified.

[key|value].projection.type=BlockList
[key|value].projection.list=**.high

The record contains the following data after applying the projection:

{
"city": "Springfield",
"population: {
"qty": 30.7,
"scale": 1000,
"units": "people"
}
}

Field Renaming Post Processors

This section provides example configurations for the RenameByMapping and RenameByRegex post processors to show how they update field names in a sink record. The field renaming parameters specify whether to update the key or value document in the record using dot notation as well as the pattern to match. You must specify the RenameByMapping and RenameByRegex properties with your parameters in a JSON array.

The field renaming post processor examples use the following sample sink record:

Sample Key Document

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Sample Value Document

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

The RenameByMapping post processor setting specifies one or more objects that assign fields matching a string to a new name in a Key or Value document.

Each object contains the text to match in the oldName element and the replacement text in the newName element as described in the table below.

Key Name
Description
oldName
Specifies whether to match a key or value document and an appended string that matches the field to replace.
newName
Contains the replacement text for all matches of the field defined in the oldName field.

The following example property matches the "location" field of a Key document and renames it to "country":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

This transforms the original key document to the following one after applying the RenameByMapping post processor:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

You can perform a similar field name assignment for value documents by specifying value with the appended field name in the oldName field as follows:

field.renamer.mapping=[{"oldName":"value.location", "newName":"city"}]

This transforms the original value document to the following one after applying the RenameByMapping post processor:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

You can also specify multiple mappings in the field.renamer.mapping property by using a stringified JSON array as shown in the following example:

field.renamer.mapping="[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]"

The RenameByRegex post processor setting is an array of objects. Each object in the array contains the following JSON element keys:

Key Name
Description
regexp
Contains a regular expression that matches fields to perform the replacement.
pattern
Contains a regular expression that matches on the text to replace.
newName
Contains the replacement text for all matches of the regular expression defined in the pattern field.

Example

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"},{"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

When the post processor is applied to the sample key document and the sample value document, it produces the following output:

Key Document

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Value Document

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

The post processor applied the following changes:

  • All field names in the key document of the sink record that started with "date" are matched. In the matched fields, all instances of "_" are replaced with "-".
  • All field names in the value document of the sink record that are subdocuments of crepes are matched. In the matched fields, all instances of "purchased" are replaced with "quantity".
Tip
Ensure renaming does not result in duplicate keys in the same document

The renaming post processors update the key fields of a JSON document which can result in duplicate keys within a document. They skip the renaming step if the replacement key already exists at the current level.

A write model defines the behavior of bulk write operations made on a MongoDB collection. The default write model for the connector is ReplaceOneModel with ReplaceOptions set to upsert mode.

You can override the default write model by specifying a custom one in the writemodel.strategy configuration setting. The following strategies are provided with the connector:

Write Model
DefaultWriteModelStrategy

Description:
The write model uses ReplaceOneDefaultStrategy by default, and InsertOneDefaultStrategy if you set the timeseries.timefield.
Default value for writemodel.strategy configuration setting.
InsertOneDefaultStrategy

Description:
Creates an InsertWriteModel.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy
ReplaceOneDefaultStrategy

Description:
Replaces at most one document that matches the current document by the _id field.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
ReplaceOneBusinessKeyStrategy

Description:
Replaces at most one document that matches filters provided by the document.id.strategy setting.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
Tip
See also:

Example of usage in ReplaceOneBusinessKey example.

DeleteOneDefaultStrategy

Description:
Deletes at most one document that matches the id specified by the document.id.strategy setting, only when the document contains a null value record.
Implicitly specified when the configuration setting mongodb.delete.on.null.values=true is set.
You can set this explicitly with the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy
DeleteOneBusinessKeyStrategy

Description:
Deletes at most one document that matches filters provided in the document.id.strategy setting.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
Tip
See also:

Example of usage in DeleteOneBusinessKey example.

UpdateOneTimestampsStrategy

Description:
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
UpdateOneBusinessKeyTimestampStrategy

Description:
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents that match the filters provided by the document.id.strategy setting.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy

If none of the pre-built write strategies suits your needs, you can create your own. A custom write model strategy is a Java class that implements WriteModelStrategy.

To configure your Sink Connector to use the custom WriteModelStrategy, follow the steps below:

  1. Create a class that implements the WriteModelStrategy interface and overrides the createWriteModel(SinkDocument) method.
  2. Compile the class to a JAR file.
  3. Add the compiled JAR to the Class Path / Plugin Path for Kafka workers. For more information about plugin paths, see the Confluent documentation.

    Note

    Plugins are loaded in isolation, so when deploying a custom write strategy, both the connector JAR and the custom write model strategy JAR should be on the same path as shown in the following example:

    mongodb-kafka-connect/lib/mongo-kafka-connect-all.jar
    mongodb-kafka-connect/lib/custom-write-model-strategy.jar
  4. Specify your Java class in the writemodel.strategy configuration setting.

The following is an example of a custom write strategy. It extracts the value of fullDocument from the value document and returns a ReplaceOne operation. The ReplaceOne operation replaces a document in the MongoDB collection that matches the ID from the value document.

/**
* A custom write model strategy
*
* This example takes the 'fullDocument' field from a change stream and creates a
* ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

A business key is a value comprised of data within the sink record that identifies it as a unique document. The following examples demonstrate WriteModel strategies that define business keys using one or more fields from the sink record.

In this example, we assemble a business key from multiple fields of a record and instruct the post processor to generate BSON ObjectId instances for inserts, but not for updates.

Follow the steps below to configure this strategy:

  1. Create a unique index that corresponds to your business key in your target MongoDB collection.
  2. In the connector configuration, specify the PartialValueStrategy as the id strategy to identify the fields that belong to the business key.
  3. In the connector configuration, specify the ReplaceOneBusinessKeyStrategy writemodel strategy.

Suppose our task is to track airplane capacity by the flight number and airport location represented by flight_no and airport_code, respectively. An example message contains the following:

{
"flight_no": "Z342",
"airport_code": "LAX",
"passengers": {
"capacity": 180,
"occupied": 152
}
}

To implement the strategy, we first create a unique index on the flight_no and airport_code fields in the MongoDB shell:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

Next, we specify the PartialValueStrategy strategy and fields to include in the business key and specify the ReplaceOneBusinessKeyStrategy writemodel strategy in the configuration file:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"

The sample data inserted into the collection contains the following:

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"flight_no": "Z342"
"airport_code": "LAX",
"passengers_spaces": {
"capacity": 180,
"occupied": 152
}
}

When the connector processes sink data that matches an existing document with the same business key field values, it updates the document with the new values without changing the _id field.

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"flight_no": "Z342"
"airport_code": "LAX",
"passengers_spaces": {
"capacity": 180,
"occupied": 95
}
}

In this example, we show how to use the DeleteOneBusinessKeyStrategy to do the following:

  • Assemble a business key from a single field from a record.
  • Instruct the post processor to delete a document in the target collection that contains a value matching the one in the business key.

Suppose our task is to track airline passenger ticket reservations and cancellations. We can use the DeleteOneBusinessKeyStrategy to delete reservations when customers cancel or when reservations go unclaimed.

The following MongoDB document represents a ticket reservation in the tickets database and reservations collection:

{
"_id": ObjectId('5db0b81199b7fbcc89631d07'),
"flight_no": "Z920",
"airport_code": "CDG",
"reservation_no": "RT211583W"
}

The following steps show how we can configure the value of the reservation_no field as the business key and create the delete strategy:

  1. Create a unique index that corresponds to your business key in your target MongoDB collection.
  2. In the connector configuration, specify the PartialValueStrategy as the id strategy to identify the field that you want to use as the business key.
  3. In the connector configuration, specify the DeleteOneBusinessKeyStrategy writemodel strategy.
  4. Specify the source topic and target MongoDB database and collection.

To implement the DeleteOneBusinessKeyStrategy with our connector, first we create a unique index on the reservation_no field in the MongoDB shell:

db.collection.createIndex({ "reservation_no": 1 }, { unique: true })

Next, we specify the PartialValueStrategy strategy as the id strategy and use the projection setting to consider only the reservation_no field with the following configuration:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=reservation_no
document.id.strategy.partial.value.projection.type=AllowList

We specify the strategy in the configuration as follows:

writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

We define the Kafka topic from which we receive cancellations and the target ticket reservation database and collection in the configuration file as follows:

topics=ReservationCancellations
database=tickets
collection=reservations

When the connector receives sink data from the "ReservationCancellations" topic, it extracts the business key using the strategy configuration settings and performs a delete operation on the target collection. If the reservation_no field contains a value of "RT211583W", the connector deletes deletes the matching document from the collection.

This example shows how we can track the create and update timestamps on documents that are upserted by the connector. The UpdateOneTimestampsStrategy custom write model strategy performs the following tasks:

  • When a new MongoDB document is inserted into the collection by the connector, the _insertedTS and _modifiedTS fields will be set to the current time.
  • When an existing MongoDB document is updated in the collection by the connector, the _modifiedTS field will be updated to the current time.

This is set by specifying the UpdateOneTimestampsStrategy in the configuration file as follows:

writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

For this example, we want to track the position of a train along its route. The _modifiedTS field provides us with the time and date that the first position report was saved into the collection.

The position report includes the following data in the value document:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40.8051693, -73.9388079 ]
}

The writemodel strategy is set to UpdateOneTimestampsStrategy to append the created and modified timestamps and the document id strategy is set to ProvidedInValueStrategy to identify the train using the _id field of the value document.

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

The MongoDB document inserted after processing the initial message for the train includes the following data:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2019-10-23T15:08:000Z"),
"_modifiedTS": ISODate("2019-10-23T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40.805, -73.939 ]
}

After an hour, the train reports its current location along its route. The position and _modifiedTS fields are updated:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2019-10-23T15:08:000Z"),
"_modifiedTS": ISODate("2019-10-23T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 41.156, -73.870 ]
}
Give Feedback

On this page