Docs Menu

Replicate Data with a Change Data Capture Handler

On this page

  • Overview
  • Requirements
  • Tutorial
  • Download Source Files
  • Set Up the Environment
  • Configure Sink Connector
  • Change Data in MongoDB
  • Stop the Pipeline
  • Further Reading

Learn how to use a change data capture (CDC) handler to replicate data with the MongoDB Kafka Connector. A CDC handler is a program that translates CDC events into MongoDB write operations. Use a CDC handler when you need to reproduce the changes in one datastore into another datastore.

In this tutorial, you use a CDC handler to make two MongoDB collections contain the same documents.

If you want to learn more about how CDC handlers work rather than view a tutorial that demonstrates how you use them, see the Change Data Capture Handlers guide.

Download and install the following packages:

Tip
Read the Docker Documentation

This guide uses the following Docker-specific terminology:

Learn more about Docker from the Docker official Get Started Guide.

The sandbox uses Docker for convenience and consistency. To learn more about deployment options for Apache Kafka, see the following resources:

1

We provide a sandbox that includes the services you need to build your sample data pipeline. To download the sandbox, clone the tutorial repository to your development environment. Then navigate to the directory that corresponds to the Replicate Data with a Change Data Capture Handler tutorial. If you use bash or a similar shell, use the following commands:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/examples/v1.6/cdc-tutorial
2

The sample pipeline consists of the following tools running in Docker containers on your computer:

  • A MongoDB replica set
  • An Apache Kafka instance
  • A Kafka Connect instance with the MongoDB Kafka Connector installed
  • A Zookeeper instance (Zookeeper is a dependency of Apache Kafka)

The pipeline comes with a source connector already installed. The source connector writes change event documents corresponding to the Source collection in the CDCTutorial database to a Kafka topic. The configuration for the source connector is as follows:

name="mongo-source-CDCTutorial-eventroundtrip"
connector.class="com.mongodb.kafka.connect.MongoSourceConnector"
connection.uri="mongodb://mongo1:27017,mongo2:27017,mongo3:27017"
database="CDCTutorial"
collection="Source"

To download and start the pipeline, execute the following command from within the root directory of your cloned repository:

docker-compose -p cdc-tutorial up -d
Note
How long does the download take?

In total, the Docker images for this tutorial require about 2.4 GB of space. The following list shows how long it takes to download the images with different internet speeds:

  • 40 megabits per second: 8 minutes
  • 20 megabits per second: 16 minutes
  • 10 megabits per second: 32 minutes

Once the preceding command finishes and the pipeline starts, you should see output that looks like this:

...
Creating mongo1 ... done
Creating mongo1 ... done
Creating zookeeper ... done
Creating broker ... done
Creating mongo1-setup ... done
Creating connect ... done
Creating shell ... done

Open a second terminal window. You will use one terminal to monitor your topic, and the other terminal to perform write operations on your database. Enter the following command into both terminals:

docker exec -it shell /bin/bash

Once you have entered the preceding command into both terminal windows, your terminals should look like:

Arrangement of two shells for this tutorial.

Arrange your two terminal windows to match the preceding image so that both are visible and one is above the other.

To monitor your topic, type the following command in your upper terminal window:

kafkacat -b broker:29092 -C -t CDCTutorial.Source
Important
Broker Leader Not Available

If you receive the following output, run the preceding kafkacat command a second time:

% Error: Topic CDCTutorial.Source error: Broker: Leader not available

Once you enter the preceding command, you should see output that looks like this:

% Reached end of topic CDCTutorial.Source [0] at offset 0

Your upper terminal window is now listening to the CDCTutorial.Source Kafka topic. Changes to your topic will print in this terminal window.

To learn more about kafkacat, see the kcat repository on GitHub.

3

To configure your sink connector, execute the following command in your lower terminal window:

curl -X POST -H "Content-Type: application/json" --data '
{ "name": "mongo-sink-CDCTutorial-eventroundtrip",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"topics":"CDCTutorial.Source",
"change.data.capture.handler":"com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"database":"CDCTutorial",
"collection":"Destination"}
}' http://connect:8083/connectors -w "\n" | jq .

After you run the preceding command, you should see the following output:

...
{
"name": "mongo-sink-CDCTutorial-eventroundtrip",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"database": "CDCTutorial",
"collection": "Destination",
"name": "mongo-sink-CDCTutorial-eventroundtrip"
},
"tasks": [],
"type": "sink"
}

This configuration makes your sink connector do the following things:

  • Listen for events on the CDCTutorial.Source topic
  • Apply a change data capture handler to documents it receives from the Tutorial.Source topic
  • Write received documents to the Destination collection in the CDCTutorial database
4

From your lower terminal, enter the MongoDB Shell with the following command:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

Once you are in the MongoDB Shell, your terminal prompt should look like this:

rs0 [primary] test>

Insert a document into the Source collection of the CDCTutorial database with the following commands:

use CDCTutorial
db.Source.insert({proclaim: "Hello World!"});

Once you insert the document, you should see output that resembles the following in your upper shell:

{"schema":{"type":"string","optional":false},
"payload":{"_id": {"_data": "8260...4"},
"operationType": "insert",
"clusterTime": {"$timestamp": {"t": 1611348141, "i": 2}},
"fullDocument": {"_id": {"$oid": "600b38ad..."}, "proclaim": "Hello World!"},
"ns": {"db": "CDCTutorial", "coll": "Source"},
"documentKey": {"_id": {"$oid": "600b38a...."}}}}

In your lower shell, inspect the Destination collection with the following command:

db.Destination.find()

You should see output that looks like this:

{ _id: ..., proclaim: 'Hello World!' }

Try deleting your document from your Source collection with the following command:

db.Source.deleteMany({})

Once you delete the document, you should see output that resembles the following in your upper shell:

{"schema":{"type":"string","optional":false},"payload":"{\"_id\":
{\"_data\": \"826138BCBA000000012B022C0100296E5A10041FD232D9ECE347FFABA837E9AB05D95046645F696400646138BCAF2A52D9E0D299336F0004\"},
\"operationType\": \"delete\", \"clusterTime\": {\"$timestamp\": {\"t\":
1631108282, \"i\": 1}}, \"ns\": {\"db\": \"CDCTutorial\", \"coll\":
\"Source\"}, \"documentKey\": {\"_id\": {\"$oid\":
\"6138bcaf2a52d9e0d299336f\"}}}"}

Now see how many documents are in your Destination collection:

db.Destination.count()

You should see the following output:

0

Once you have finished exploring the connector in the MongoDB shell, you can exit the MongoDB shell with the following command:

exit

Explore the sample pipeline on your own. Here are some challenges to get you started:

  • Add a new source connector that writes to the CDCTutorial.Source topic. Configure your new connector to write insert events. To learn how to filter event types in your connector, see the Customize a Pipeline to Filter Change Events guide.
  • Add a new source connector configured with the publish.full.document.only=true option that writes to the CDCTutorial.Source topic. Publish a document with your new source connector. This produces an error in your sink connector and your sink connector stops. Configure your sink connector to write errant messages to a topic rather than stop. To learn how to write errant messages to a topic, see Write Errors and Errant Messages to a Topic.
  • Remove the change.data.capture.handler from your sink connector. Add the source connector from the tutorial to Kafka Connect if its not already added. Insert a document into MongoDB as done in the tutorial. Look at the change event document your sink connector inserts into MongoDB.
5

To conserve resources on your computer, make sure to stop the sample pipeline once you are done exploring this example.

Before you stop the sample pipeline, make sure to exit your Docker shell. You can exit your Docker shell by running the following command in your lower terminal:

exit

To stop the sample pipeline and remove containers and images, run the following command:

docker-compose -p cdc-tutorial down --rmi 'all'

To learn more about the topics discussed in this tutorial, see the following MongoDB Kafka Connector guides:

Give Feedback
MongoDB logo
© 2021 MongoDB, Inc.

About

  • Careers
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.