Kafka Sink Connector Post-Processors¶
Post Processing of Documents¶
Post processors are sink connector classes that modify data in the
SinkDocument
, a class that contains a BSON representation of the
SinkRecord
key and value fields, after it has been read from the
Kafka topic. The connector applies a chain of post processors in
which each post processor is executed in the order provided on
the SinkDocument
, and the result is stored in a MongoDB collection.
Post processors perform data modification tasks such as setting
the document _id
field, message key or value projection, renaming
fields, and redacting sensitive information. You can implement your own
post processor by extending the PostProcessor
class or use one of the following pre-built ones:
Post Processor Name | Description |
---|---|
DocumentIdAdder | Full Path: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder Uses a configured strategy to insert an _id field.Tip See also: |
BlockListKeyProjector | Full Path: com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector Removes matching key fields from the sink record. Tip See also: Configuration and Example. |
BlockListValueProjector | Full Path: com.mongodb.kafka.connect.sink.processor.BlockListValueProjector Removes matching value fields from the sink record. Tip See also: Configuration and Example. |
AllowListKeyProjector | Full Path: com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector Includes only matching key fields from the sink record. Tip See also: Configuration and Example. |
AllowListValueProjector | Full Path: com.mongodb.kafka.connect.sink.processor.AllowListValueProjector matching value fields from the sink record. Tip See also: Configuration and Example. |
KafkaMetaAdder | Full Path: com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder Adds a field composed of the concatenation of Kafka topic, partition, and offset to the document. |
RenameByMapping | Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping Renames fields that are an exact match to a specified field name in the key or value document. Tip See also: |
RenameByRegex | Full Path: com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex Renames fields that match a regular expression. Tip See also: |
You can configure the post processor chain by specifying an ordered,
comma separated list of fully-qualified PostProcessor
class names:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
The DocumentIdAdder
post processor is automatically added to the
first position in the chain if not already present.
Configuration Options¶
This section explains the available configuration options for post processors included in the MongoDB Kafka Connector.
DocumentIdAdder¶
The DocumentIdAdder
post processor provides the _id
field for
a SinkDocument
before it is written to a MongoDB collection. This
post processor is configured using a strategy that contains the logic
for generating the value of the _id
. The following strategies are
provided with this connector:
Strategy Name | Description |
---|---|
BsonOidStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy Default value for the DocumentIdAdder post processor.Generates a MongoDB BSON ObjectId. |
KafkaMetaDataStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy Builds a string composed of the concatenation of Kafka topic, partition, and offset. |
FullKeyStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy Uses complete key structure of the SinkDocument .Defaults to a blank document if no key exists. |
ProvidedInKeyStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy Uses the _id field specified in the key structure of the SinkDocument if it exists.Throws an exception if the field is missing. |
ProvidedInValueStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy Uses the _id field specified in the value structure of the SinkDocument if it exists.Throws an exception if the field is missing. |
PartialKeyStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy Uses a block list or allow list projection of the key structure of the SinkDocument .Defaults to a blank document if no key exists. |
PartialValueStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy Uses a block list or allow list projection of the value structure of the SinkDocument .Defaults to a blank document if no value exists. |
UuidProvidedInKeyStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format. |
UuidProvidedInValueStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format. |
UuidStrategy | Full Path: com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy Generates a random UUID as a string. |
You can assign the document.id.strategy
property as follows:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
To define a custom strategy, create a class that implements the interface
IdStrategy
and provide the fully-qualified path to the document.id.strategy
setting.
BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since new ids would be generated on retries or re-processing. Other strategies permit exactly-once delivery if the fields that form the document _id are guaranteed to be unique.
Block List / Allow List Projector¶
This section provides example projection configurations to show how they filter the following sample record:
{ "name": "Anonymous", "age": 42, "active": true, "address": { "city": "Unknown", "country": "NoWhereLand" }, "food": [ "Austrian", "Italian" ], "data": [ { "k": "foo", "v": 1 } ], "lut": { "key1": 12.34, "key2": 23.45 }, "destination: {
"city": "Springfield",
"country": "AnotherLand"
}
}
The following example configurations contain [key|value]
placeholder values that represent either key
or value
in order
to avoid repetition. Specify the one appropriate to your use case when
creating your configuration.
Block List Projection Example¶
In the following example sink configuration, we specify a block list projection and the specific fields to omit from the record:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockList[Key|Value]Projector [key|value].projection.type=BlockList [key|value].projection.list=age,address.city,lut.key2,data.v
You can use the "." (dot) notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.
The record contains the following data after applying the projection:
{ "name": "Anonymous", "active": true, "address": { "country": "NoWhereLand" }, "food": [ "Austrian", "Italian" ], "data": [ { "k": "foo" } ], "lut": { "key1": 12.34 }, "destination: {
"city": "Springfield",
"country": "AnotherLand"
}
}
Allow List Projection Example¶
In the following example sink configuration, we specify an allow list projection and the specific fields to include in the record:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowList[Key|Value]Projector [key|value].projection.type=AllowList [key|value].projection.list=age,address.city,lut.key2,data.v
You can use the "." notation to reference subdocuments in the record. You can also use it to reference fields of documents within an array.
The record contains the following data after applying the projection:
{ "age": 42, "address": { "city": "Unknown" }, "data": [ { "v": 1 } ], "lut": { "key2": 23.45 } }
Projection Wildcard Pattern Matching¶
The previous example projection configurations demonstrated exact
string matching on field names. The projection list
setting also supports
the following wildcard patterns matching on field names:
- "
*
" (star
): matches a string of any length for the level in the document in which it is specified. - "
**
" (double star
): matches the current and all nested levels from which it is specified.
The examples below demonstrate how to use each wildcard pattern and the projection output from the following sample record:
{ "forecast": [ { "day": "Monday", "temp": { "degrees": 25.3, "units": "C", "range": { "low": 23.0, "high": 28.7 } }, "uv": 5 } ], "city": "Springfield", "population: {
"qty": 30.7,
"scale": 1000,
"units": "people"
}
}
Allow List Wildcard Examples
The *
wildcard pattern in the example below matches all the keys named
temp
in the forecast
array and all fields nested a single level
below it.
[key|value].projection.type=AllowList [key|value].projection.list=city,forecast.temp.*
The record contains the following data after applying the projection:
{ "city": "Springfield", "forecast": [ { "temp": { "degrees": 25.3, "units": "C", "range": { "low": 23.0, "high": 28.7 } } } ] }
The **
wildcard pattern in the example below matches all the keys for
all levels that contain the field scale
.
[key|value].projection.type=AllowList [key|value].projection.list=**.scale
The record contains the following data after applying the projection:
{ "population: {
"qty": 30.7,
"scale": 1000,
"units": "people"
}
}
Block List Wildcard Examples
The wildcard character can also be used to match all field names at specific levels as demonstrated in the following block list projection configuration example:
[key|value].projection.type=BlockList [key|value].projection.list=population,forecast.*.*
The record contains the following data after applying the projection:
{ "forecast": [ { "day": "Monday", "uv": 5 } ], "city": "Springfield", }
The connector configuration also supports the **
(double star)
wildcard which matches the current and all nested levels from which it is
specified.
[key|value].projection.type=BlockList [key|value].projection.list=**.high { "city": "Springfield", "population: { "qty": 30.7, "scale": 1000, "units": "people" } }
Field Renaming Post Processors¶
This section provides example configurations for the RenameByMapping
and RenameByRegex
post processors to show how they update field names
in a sink record. The field renaming parameters specify whether to update the
key
or value
document in the record using dot notation as well as
the pattern to match. You must specify the RenameByMapping
and
RenameByRegex
properties with your parameters in a JSON array.
The field renaming post processor examples use the following sample sink record:
Sample Key Document
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Sample Value Document
{ "flapjacks": { "purchased": 598, "size": "large" } }
RenameByMapping Example¶
The RenameByMapping
post processor setting specifies one or more
objects that assigns fields matching a string to a new name in a Key or Value
document.
Each object contains the text to match in the oldName
element and the
replacement text in the newName
element as described in the table
below.
Key Name | Description |
---|---|
oldName | Specifies whether to match a key or value document and an appended
string that matches the field to replace. |
newName | Contains the replacement text for all matches of the field defined
in the oldName field. |
The following example property matches the "location" field of a Key document and renames it to "country":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
This transforms the original key document
to the following one after applying the RenameByMapping
post processor:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
You can perform a similar field name assignment for value documents by
specifying value
with the appended field name in the oldName
field as follows:
field.renamer.mapping=[{"oldName":"value.location", "newName":"city"}]
This transforms the original value document
to the following one after applying the RenameByMapping
post processor:
{ "crepes": { "purchased": 598, "size": "large" } }
You can also specify multiple mappings in the field.renamer.mapping
property by using a stringified JSON array as shown in the following
example:
field.renamer.mapping="[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]"
RenameByRegex¶
The RenameByRegex
post processor setting is an array of objects.
Each object in the array contains the following JSON element keys:
Key Name | Description |
---|---|
regexp | Contains a regular expression that matches fields to perform the
replacement. |
pattern | Contains a regular expression that matches on the text to replace. |
newName | Contains the replacement text for all matches of the regular expression
defined in the pattern field. |
Example
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"},{"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
When the post processor is applied to the sample key document and the sample value document, it produces the following output:
Key Document
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Value Document
{ "crepes": { "quantity": 598, "size": "large" } }
The post processor applied the following changes:
- All field names in the key document of the sink record that started with "date" are matched. In the matched fields, all instances of "_" are replaced with "-".
- All field names in the value document of the sink record that are
subdocuments of
crepes
are matched. In the matched fields, all instances of "purchased" are replaced with "quantity".
The renaming post processors update the key fields of a JSON document which can result in duplicate keys within a document. They skip the renaming step if the replacement key already exists at the current level.
Custom Write Model Strategy¶
A write model defines the behavior of bulk write operations made on a MongoDB collection. The default write model for the connector is ReplaceOneModel with ReplaceOptions set to upsert mode.
You can override the default write model by specifying a custom one in the
writemodel.strategy
configuration setting. The following strategies are
provided with the connector:
Write Model | Description |
---|---|
ReplaceOneDefaultStrategy | Replaces at most one document that matches the current document by the _id field.Default value for writemodel.strategy configuration setting. |
ReplaceOneBusinessKeyStrategy | Replaces at most one document that matches filters provided by the document.id.strategy setting.Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy Tip See also: Example of usage in WriteModel Strategy: Business Keys. |
DeleteOneDefaultStrategy | Deletes at most one document that matches the id specified by the document.id.strategy setting, only when the document contains a null value record.Implicitly specified when the configuration setting mongodb.delete.on.null.values=true is set.You can set this explicitly with the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy |
UpdateOneTimestampsStrategy | Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy Tip See also: Example of usage in WriteModel Strategy: Inserted and Modified Timestamps. |
UpdateOneBusinessKeyTimestampStrategy | Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents that match the filters provided by the document.id.strategy setting.Set the following configuration: writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy |
Create Your Own Custom Write Model Strategy¶
If none of the pre-built write strategies suits your needs, you can
create your own. A custom write model strategy is a Java class that
implements WriteModelStrategy
.
To configure your Sink Connector to use the custom WriteModelStrategy
,
follow the steps below:
- Create a class that implements the
WriteModelStrategy
interface and overrides thecreateWriteModel(SinkDocument)
method. - Compile the class to a JAR file.
Add the compiled JAR to the Class Path / Plugin Path for Kafka workers. For more information about plugin paths, see the Confluent documentation.
NotePlugins are loaded in isolation, so when deploying a custom write strategy, both the connector JAR and the custom write model strategy JAR should be on the same path as shown in the following example:
mongodb-kafka-connect/lib/mongo-kafka-connect-all.jar
mongodb-kafka-connect/lib/custom-write-model-strategy.jar
- Specify your Java class in the
writemodel.strategy
configuration setting.
The following is an example of a custom write strategy. It extracts the
value of fullDocument
from the value document and returns a
ReplaceOne
operation. The ReplaceOne
operation replaces a document
in the MongoDB collection that matches the ID from the value document.
/**
* A custom write model strategy
*
* This example takes the 'fullDocument' field from a change stream and creates a
* ReplaceOne operation.
*/ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
WriteModel Strategy: Business Keys¶
A business key is a value comprised of data within the sink record that identifies it as a unique document. This example defines a business key using data contained within multiple fields in the record as well as instructs the post processor to generate BSON ObjectIds for inserts, but not for updates.
Follow the steps below to configure this strategy:
- Create a unique index that corresponds to your business key in your target MongoDB collection.
- In the connector configuration, specify the
PartialValueStrategy
as the id strategy to identify the fields that belong to the business key. - In the connector configuration, specify the
ReplaceOneBusinessKeyStrategy
writemodel strategy.
For this example, we track airplane capacity by the flight number and
airport location represented by flight_no
and airport_code
,
respectively. An example message contains the following:
{ "flight_no": "Z342", "airport_code": "LAX", "passengers": { "capacity": 180, "occupied": 152 } }
To implement the strategy, we first create a unique index on the
flight_no
and airport_code
fields in the MongoDB shell:
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
Next, we specify the PartialValueStrategy
strategy and fields to
include in the business key and specify the
ReplaceOneBusinessKeyStrategy
writemodel strategy in the configuration
file:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"
The sample data inserted into the collection contains the following:
{ "_id": ObjectId('5db0b81199b7fbcc89631d06'), "flight_no": "Z342" "airport_code": "LAX", "passengers_spaces": { "capacity": 180, "occupied": 152 } }
When the connector processes sink data that matches an existing document with
the same business key field values, it updates the document with the new
values without changing the _id
field.
{ "_id": ObjectId('5db0b81199b7fbcc89631d06'), "flight_no": "Z342" "airport_code": "LAX", "passengers_spaces": { "capacity": 180, "occupied": 95 } }
WriteModel Strategy: Inserted and Modified Timestamps¶
This example shows how we can track the create and update timestamps on
documents that are upserted by the connector. The
UpdateOneTimestampsStrategy
custom write model strategy performs the
following tasks:
- When a new MongoDB document is inserted into the collection by the
connector, the
_insertedTS
and_modifiedTS
fields will be set to the current time. - When an existing MongoDB document is updated in the collection by the
connector, the
_modifiedTS
field will be updated to the current time.
This is set by specifying the UpdateOneTimestampsStrategy
in the
configuration file as follows:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
For this example, we want to track the position of a train along its
route. The _modifiedTS
field provides us with the time and date that the
first position report was saved into the collection.
The position report includes the following data in the value document:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40.8051693, -73.9388079 ] }
The writemodel strategy is set to UpdateOneTimestampsStrategy
to
append the created and modified timestamps and the document id strategy is
set to ProvidedInValueStrategy
to identify the train using the _id
field of the value document.
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
The MongoDB document inserted after processing the initial message for the train includes the following data:
{ "_id": "MN-1234", "_insertedTS": ISODate('2019-10-23T15:08:000Z"),
"_modifiedTS": ISODate("2019-10-23T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40.805, -73.939 ]
}
After an hour, the train reports its current location along its route. The
position
and _modifiedTS
fields are updated:
{ "_id": "MN-1234", "_insertedTS": ISODate('2019-10-23T15:08:000Z"),
"_modifiedTS": ISODate("2019-10-23T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 41.156, -73.870 ]
}