Navigation

Kafka Sink Connector Post-Processors

Post Processing of Documents

Post processors are sink connector classes that modify data in the SinkDocument, a class that contains a BSON representation of the SinkRecord key and value fields, after it has been read from the Kafka topic. The connector applies a chain of post processors in which each post processor is executed in the order provided on the SinkDocument, and the result is stored in a MongoDB collection.

Post processors perform data modification tasks such as setting the document _id field, key or value field projection, renaming fields, and redacting sensitive information. You can use the following pre-built post processors or implement your own by extending the PostProcessor class:

Post Processor Name Description
DocumentIdAdder
Full Path: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Uses a configured strategy to insert an _id field.
BlacklistKeyProjector
Full Path: com.mongodb.kafka.connect.sink.processor.BlacklistKeyProjector
Removes matching key fields from the sink record.

See also

Configuration and Example.

BlacklistValueProjector
Full Path: com.mongodb.kafka.connect.sink.processor.BlacklistValueProjector
Removes matching value fields from the sink record.

See also

Configuration and Example.

WhitelistKeyProjector
Full Path: com.mongodb.kafka.connect.sink.processor.WhitelistKeyProjector
Includes only matching key fields from the sink record.

See also

Configuration and Example.

WhitelistValueProjector
Full Path: com.mongodb.kafka.connect.sink.processor.WhitelistValueProjector
matching value fields from the sink record.

See also

Configuration and Example.

KafkaMetaAdder
Full Path: com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field composed of the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified key or value field.
RenameByRegex
Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expresion.

You can configure the post processor chain by specifying an ordered, comma separated list of fully-qualified PostProcessor class names:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.WhitelistValueProjector

Note

The DocumentIdAdder post processor is automatically added to the first position in the chain if not already present.

Configuration Options

This section explains the available configuration options for post processors included in the MongoDB Kafka Connector.

DocumentIdAdder

The DocumentIdAdder post processor provides the _id field for a SinkDocument before it is written to a MongoDB collection. This post processor is configured using a strategy that contains the logic for generating the value of the _id. The following strategies are provided with this connector:

Strategy Name Description
BsonOidStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Default value for the DocumentIdAdder post processor.
Generates a MongoDB BSON ObjectId.
KafkaMetaDataStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses complete key structure of the SinkDocument.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the SinkDocument if it exists.
Throws an exception if the field is missing.
ProvidedInValueStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the SinkDocument if it exists.
Throws an exception if the field is missing.
PartialKeyStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a blacklist or whitelist projection of the key structure of the SinkDocument.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a blacklist or whitelist projection of the value structure of the SinkDocument.
Defaults to a blank document if no value exists.
UuidStrategy
Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
Generates a random UUID as a string.

You can assign the document.id.strategy property as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

To define a custom strategy, create a class that implements the interface IdStrategy and provide the fully-qualified path to the document.id.strategy setting.

Selected strategy may have implications on delivery semantics

BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since new ids would be generated on retries or re-processing. Other strategies permit exactly-once delivery if the fields that form the document _id are guaranteed to be unique.

Blacklist / Whitelist Projector

This section provides example projection configurations to show how they filter the following sample record:

{
  "name": "Anonymous",
  "age": 42,
  "active": true,
  "address": {
    "city": "Unknown",
    "country": "NoWhereLand"
  },
  "food": [
    "Austrian",
    "Italian"
  ],
  "data": [
    {
      "k": "foo",
      "v": 1
    }
  ],
  "lut": {
    "key1": 12.34,
    "key2": 23.45
  },
  "destination: {
    "city": "Springfield",
    "country": "AnotherLand"
  }
}

Note

The following example configurations contain [key|value] placeholder values that represent either key or value in order to avoid repetition. Specify the one appropriate to your use case when creating your configuration.

Blacklist Projection Example

In the following example sink configuration, we specify a blacklist projection and the specific fields to omit from the record:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.Blacklist[Key|Value]Projector
[key|value].projection.type=blacklist
[key|value].projection.list=age,address.city,lut.key2,data.v

Note

You can use the “.” (dot) notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.

The record contains the following data after applying the projection:

{
  "name": "Anonymous",
  "active": true,
  "address": {
    "country": "NoWhereLand"
  },
  "food": [
    "Austrian",
    "Italian"
  ],
  "data": [
    {
      "k": "foo"
    }
  ],
  "lut": {
    "key1": 12.34
  },
  "destination: {
    "city": "Springfield",
    "country": "AnotherLand"
  }
}

Whitelist Projection Example

In the following example sink configuration, we specify a whitelist projection and the specific fields to include in the record:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.Whitelist[Key|Value]Projector
[key|value].projection.type=whitelist
[key|value].projection.list=age,address.city,lut.key2,data.v

Note

You can use the “.” notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.

The record contains the following data after applying the projection:

{
  "age": 42,
  "address": {
    "city": "Unknown"
  },
  "data": [
    {
      "v": 1
    }
  ],
  "lut": {
    "key2": 23.45
  }
}

Projection Wildcard Pattern Matching

The previous example projection configurations demonstrated exact string matching on field names. The projection list setting also supports the following wildcard patterns matching on field names:

  • *” (star): matches a string of any length for the level in the document in which it is specified.
  • **” (double star): matches the current and all nested levels from which it is specified.

The examples below demonstrate how to use each wildcard pattern and the projection output from the following sample record:

{
  "forecast": [
    { "day": "Monday",
      "temp": {
        "degrees": 25.3,
        "units": "C",
        "range": {
          "low": 23.0,
          "high": 28.7
        }
      },
      "uv": 5
    }
  ],
  "city": "Springfield",
  "population: {
    "qty": 30.7,
    "scale": 1000,
    "units": "people"
  }
}

Whitelist Wildcard Examples

The * wildcard pattern in the example below matches all the keys named temp in the forecast array and all fields nested a single level below it.

[key|value].projection.type=whitelist
[key|value].projection.list=city,forecast.temp.*

The record contains the following data after applying the projection:

{
  "city": "Springfield",
  "forecast": [
    {
      "temp": {
        "degrees": 25.3,
        "units": "C",
        "range": {
          "low": 23.0,
          "high": 28.7
        }
      }
    }
  ]
}

The ** wildcard pattern in the example below matches all the keys for all levels that contain the field scale.

[key|value].projection.type=whitelist
[key|value].projection.list=**.scale

The record contains the following data after applying the projection:

{
  "population: {
    "qty": 30.7,
    "scale": 1000,
    "units": "people"
  }
}

Blacklist Wildcard Examples

The wildcard character can also be used to match all field names at specific levels as demonstrated in the following blacklist projection configuration example:

[key|value].projection.type=blacklist
[key|value].projection.list=population,forecast.*.*

The record contains the following data after applying the projection:

{
  "forecast": [
    {
      "day": "Monday",
      "uv": 5
    }
  ],
  "city": "Springfield",
}

The connector configuration also supports the ** (double star) wildcard which matches the current and all nested levels from which it is specified.

[key|value].projection.type=blacklist
[key|value].projection.list=**.high

{
  "city": "Springfield",
  "population: {
    "qty": 30.7,
    "scale": 1000,
    "units": "people"
  }
}

Field Renaming Post Processors

This section provides example configurations for the RenameByMapping and RenameByRegex post processors to show how they update field names in a sink record. The field renaming parameters specify whether to update the key or value document in the record using dot notation as well as the pattern to match and replacement string in a JSON array.

The field renaming post processor examples use the following sample sink record:

Key document

{
  "location": "Provence",
  "date_month": "October",
  "date_day": 17
}

Value document

{
  "flapjacks": {
    "purchased": 598,
    "size": "large"
  }
}

RenameByMapping Example

The RenameByMapping post processor setting is an array of objects. Each object in the array contains the following JSON element keys:

Each object contains the text to match in the oldName element and the replacement text in the newName element.

Key Name Description
oldName Contains a string that matches on the text to replace.
newName Contains the replacement text for all matches of the string defined in the oldName field.
field.renamer.mapping=[{"oldName":"key.location","newName":"city"},{"oldName":"value.flapjacks","newName":"crepes"}]

The record contains the following data after applying the RenameByMapping post processor:

Key document

{
  "city": "Provence",
  "date_month": "October",
  "date_day": 17
}

Value document

{
  "crepes": {
    "purchased": 598,
    "size": "large"
  }
}

RenameByRegex

The RenameByRegex post processor setting is an array of objects. Each object in the array contains the following JSON element keys:

Key Name Description
regexp Contains a regular expression that matches fields to perform the replacement.
pattern Contains a regular expression that matches on the text to replace.
newName Contains the replacement text for all matches of the regular expression defined in the pattern field.

Example

field.renamer.mapping=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"},{"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

The record contains the following data after applying the RenameByMapping post processor:

Key document

{
  "city": "Provence",
  "date-month": "October",
  "date-day": 17
}

Value document

{
  "crepes": {
    "quantity": 598,
    "size": "large"
  }
}

The post processor applied the following changes:

  • All field names in the key document of the sink record that started with “date” are matched. In the matched fields, all instances of “_” are replaced with “-“.
  • All field names in the value document of the sink record that are subdocuments of crepes are matched. In the matched fields, all instances of “purchased” are replaced with “quantity”.

Ensure renaming does not result in duplicate keys in the same document

The renaming post processors update the key fields of a JSON document which can result in duplicate keys within a document. They skip the renaming step if the replacement key already exists at the current level.

Custom Write Models

A write model defines the behavior of bulk write operations made on a MongoDB collection. The default write model for the connector is ReplaceOneModel with ReplaceOptions set to upsert mode.

You can override the default write model by specifying a custom one in the mongodb.writemodel.strategy configuration setting. The following strategies are provided with the connector:

Write Model Description
ReplaceOneDefaultStrategy
Replaces at most one document that matches the current document by the _id field.
Default value for writemodel.strategy configuration setting.
ReplaceOneBusinessKeyStrategy
Replaces at most one document that matches filters provided by the document.id.strategy setting.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

See also

Example of usage in WriteModel Strategy: Business Keys.

DeleteOneDefaultStrategy
Deletes at most one document that matches the id specified by the document.id.strategy setting, only when the document contains a null value record.
Implicitly specified when the configuration setting mongodb.delete.on.null.values=true is set.
You can set this explicitly with the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy
UpdateOneTimestampsStrategy
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.
Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

Note

Future versions of the MongoDB Kafka Connector will allow developers to specify a user-created, custom strategy in the configuration settings.

WriteModel Strategy: Business Keys

A business key is a value comprised of data within the sink record that identifies it as a unique document. This example defines a business key using data contained within multiple fields in the record as well as instructs the post processor to generate BSON ObjectIds for inserts, but not for updates.

The following steps are required to configure this strategy:

  1. Create a unique index that corresponds to your business key in your target MongoDB collection.
  2. In the connector configuration, specify the PartialValueStrategy as the id strategy to identify the fields that belong to the business key.
  3. In the connector configuration, specify the ReplaceOneBusinessKeyStrategy writemodel strategy.

For this example, we track airplane capacity by the flight number and airport it is currently located represented by flight_no and airport_code, respectively. An example message contains the following:

{
  "flight_no": "Z342",
  "airport_code": "LAX",
  "passengers": {
    "capacity": 180,
    "occupied": 152
  }
}

In order to implement the strategy, we first create a unique index on the flight_no and airport_code fields in the MongoDB shell:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

Next, we specify the PartialValueStrategy strategy and fields to include in the business key and specify the ReplaceOneBusinessKeyStrategy writemodel strategy in the configuration file:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=flight_no,airport_code
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"

The sample data inserted into the collection contains the following:

{
  "_id": ObjectId('5db0b81199b7fbcc89631d06'),
  "flight_no": "Z342"
  "airport_code": "LAX",
  "passengers_spaces": {
    "capacity": 180,
    "occupied": 152
  }
}

When the connector processes sink data that matches an existing document with the same business key field values, it updates the document with the new values without changing the _id field.

{
  "_id": ObjectId('5db0b81199b7fbcc89631d06'),
  "flight_no": "Z342"
  "airport_code": "LAX",
  "passengers_spaces": {
    "capacity": 180,
    "occupied": 95
  }
}

Projection post processors are not compatible with PartialValueStrategy

The PartialValueStrategy id strategy uses the [key|value].projection.type and [key|value].projection.list settings to define which fields are used to form the _id field. Since the Blacklist and Whitelist post processors use the same projection settings, they cannot be specified separately. Use a Single Message Transform (SMT) outside the connector to format your source data as necessary.

WriteModel Strategy: Inserted and Modified Timestamps

This example shows how we can track the create and update timestamps on documents that are upserted by the connector. The UpdateOneTimestampsStrategy custom write model strategy performs the following tasks:

  • When a new MongoDB document is inserted into the collection by the connector, the _insertedTS and _modifiedTS fields will be set to the current time.
  • When an existing MongoDB document is updated in the collection by the connector, the _modifiedTS field will be updated to the current time.

This is set by specifying the UpdateOneTimestampsStrategy in the configuration file as follows:

writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

For this example, we want to track the position of a train along its route. The _modifiedTS field provides us with the time and date that the first position report was saved into the collection.

The position report includes the following data in the value document:

{
  "_id": "MN-1234",
  "start": "Beacon",
  "destination": "Grand Central"
  "position": [ 40.8051693, -73.9388079 ]
}

The writemodel strategy is set to UpdateOneTimestampsStrategy to append the created and modified timestamps and the document id strategy is set to ProvidedInValueStrategy to identify the train using the _id field of the value document.

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

The MongoDB document inserted after processing the initial message for the train includes the following data:

{
  "_id": "MN-1234",
  "_insertedTS": ISODate('2019-10-23T15:08:000Z"),
  "_modifiedTS": ISODate("2019-10-23T15:08:000Z"),
  "start": "Beacon",
  "destination": "Grand Central"
  "position": [ 40.805, -73.939 ]
}

After an hour, the train reports its current location along its route. The position and _modifiedTS fields are updated:

{
  "_id": "MN-1234",
  "_insertedTS": ISODate('2019-10-23T15:08:000Z"),
  "_modifiedTS": ISODate("2019-10-23T16:08:000Z"),
  "start": "Beacon",
  "destination": "Grand Central"
  "position": [ 41.156, -73.870 ]
}