Docs Menu

Write Model Strategies

On this page

  • Overview
  • How to Specify Write Model Strategies
  • Specify a Business Key
  • Examples
  • Update One Time Stamps Strategy
  • Replace One Business Key Strategy
  • Delete One Business Key Strategy
  • Custom Write Model Strategies
  • Sample Write Model Strategy
  • How to Use Your Strategy

This guide shows you how to change the way your sink connector writes data to MongoDB.

Use cases for changing how your connector writes data to MongoDB include:

  • Improve performance by inserting rather than upserting documents
  • Replace documents using a filter other than your _id field
  • Delete documents in MongoDB matching a filter on your messages

You configure how your connector writes data to MongoDB by specifying a write model strategy. A write model strategy is a strategy that defines how your connector constructs the write models that form the bulk write operations your connector performs on your MongoDB deployment. A strategy is an algorithm that you can change in the MongoDB Kafka Connector. A write model is a representation of an individual write operation.

If you would rather learn how to modify the sink records your connector receives before your connector writes them to MongoDB, read the Sink Connector Post Processors guide.

To learn more about bulk writes, see the MongoDB Java driver guide on bulk writes.

To learn how write model strategies work in the connector, see this section of the MongoDB Kafka Connector source code.

To learn more about strategies, see the Wikipedia article on the strategy pattern.

To specify a write model strategy, use the following option:

writemodel.strategy:<a write model strategy>

For a list of all the pre-built write model strategies the MongoDB Kafka Connector provides, see our guide on sink connector configuration properties.

A business key is a value composed of one or more fields in your sink record that identifies it as unique. The following write model strategies require a business key:

  • ReplaceOneBusinessKeyStrategy
  • DeleteOneBusinessKeyStrategy
  • UpdateOneBusinessKeyTimestampStrategy

Your sink connector uses the _id field of your sink record to retrieve your business key. To specify a business key, you must set the _id field of your sink record with the Document ID Adder post processor.

You can set the _id field in your sink record like this:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma separate list of fields>
document.id.strategy.partial.value.projection.type=AllowList
Important
Unique Index

Create a unique index in your target collection corresponding to the fields of your business key. This will improve the performance of write operations from your sink connector. To learn more, see our guide on unique indexes.

For more information on the Document ID Adder post processor, see Configure the Document Id Adder Post Processor.

This section shows examples of configuration and output of the following types of write model strategies:

In this example, you use the UpdateOneTimestampsStrategy to track the position of a train.

Your sink connector receives messages with the following structure:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Use the ProvidedInValueStrategy to specify that your connector should use the _id value of your message as the _id field in your MongoDB document. Set your write model strategy and id strategy with the following options:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

Assume your sink connector receives the preceding sample message. After your sink connector processes the message, your MongoDB collection contains the following document:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

After an hour, your sink connector receives the following message:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 41, -73 ]
}

Once your sink connector updates MongoDB, your document in MongoDB looks like this:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 41, -73 ]
}

For more information on the ProvidedInValueStrategy, see Configure the Document Id Adder Post Processor.

In this example, your sink connector receives messages with the following structure and your business key consists of the color and taste fields:

{
"color": "blue",
"taste": "good",
"quantity": 1
}

First, create a unique index on the color and taste fields in your target collection by running the following command in the MongoDB shell:

db.collection.createIndex({ "color": 1, "taste": 1}, { unique: true })

Then, specify your business key and write model strategy in your sink connector with the following configuration:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=color,taste
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

Assume your sink connector receives the preceding sample message. After your sink connector processes the message, your MongoDB collection contains the following document:

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"color": "blue",
"taste": "good",
"quantity": 1
}

Your sink connector now receives the following message:

{
"color": "blue",
"taste": "good",
"quantity": 2
}

After your sink connector processes the message, your document in MongoDB looks like this:

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"color": "blue",
"taste": "good",
"quantity": 2
}

Your sink connector receives messages with the following structure and your business key is the name field:

{
"name": "Spot",
"animal": "dog",
}

First, create a unique index on the name field in your target collection by running the following command in the MongoDB shell:

db.collection.createIndex({ "name": 1 }, { unique: true })

Then, specify your business key and write model strategy in your sink connector with the following configuration:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=reservation_no
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

Your MongoDB collection contains the following document:

{
"_id": ObjectId('5db0b81199b7fbcc89631d06'),
"name": "Spot",
"animal": "dog",
"friendly": true
}

Now, assume your sink connector receives the preceding sample message. After processing the sample message, your sink connector has deleted the preceding sample document from your collection.

If none of the pre-built write model strategies fit your use case, you can create your own.

Your custom write model strategy is a Java class that implements the WriteModelStrategy interface and overrides the createWriteModel() method.

To learn more, see the source code for the WriteModelStrategy interface.

The following custom write model strategy returns a write operation that replaces a document in MongoDB that matches the _id field of your sink record with the value of the fullDocument field of your sink record:

/**
* A custom write model strategy
*
* This example takes the 'fullDocument' field from a change stream and creates a
* ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

For another example of a custom write model strategy, see the kafka-edu repository on GitHub.

To configure your sink connector to use your custom write strategy, you must do the following actions:

  1. Compile your custom write strategy class to a JAR file.
  2. Add the compiled JAR to the classpath/plugin path for your Kafka workers. For more information about plugin paths, see the Confluent documentation.

    Note

    Kafka Connect loads plugins in isolation. When you deploy a custom write strategy, both the connector JAR and the write model strategy JAR should be on the same path. Your paths should resemble the following:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    To learn more about Kafka Connect plugins, see this guide from Confluent.

  3. Specify your custom class in the writemodel.strategy configuration setting.

To learn how to compile a class to a JAR file, see this guide from Oracle.

Give Feedback
MongoDB logo
© 2021 MongoDB, Inc.

About

  • Careers
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.