Navigation

Kafka Sink Change Data Capture

Change data capture (CDC) is an architecture that converts changes in a source database into event streams. You can capture CDC events with the MongoDB Kafka sink connector and perform corresponding insert, update, and delete operations to a destination MongoDB cluster.

You can also handle CDC using the following event producers:

To configure your MongoDB Kafka sink connector to handle CDC events from a Kafka topic, update your configuration to include the following:

change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

The ChangeStreamHandler class instructs the sink connector to process change events that are in the change stream response document format. You can use a MongoDB Kafka source connector to configure the change stream data that you want to publish to specific topics.

Remember to specify the topic and the destination in the following configuration properties:

  • topics
  • connection.uri
  • database
  • collection

For more information on the properties above, see our guide on Kafka Sink Connector Configuration Properties.

The following sample JSON payload instantiates a new connector that uses the ChangeStreamHandler with a specified CDC configuration when posted to the Kafka Connect REST endpoint:

{
"name": "mongo-sink-changestreamhandler-cdc",
"config": {
"connection.uri": "mongodb://<hostname>:27017/kafkaconnect?w=1&journal=true",
"topics": "myreplset.kafkaconnect.mongosrc",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
"collection": "mongosink"
}
}
Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

The MongoDB Kafka sink connector can also process event streams using Debezium as an event producer for the following source databases:

You can configure the sink connector to process data from a CDC stream using one of the included handlers for Debezium or a custom handler that extends the abstract class CdcHandler. The included handlers for Debezium can be found in the MongoDB Kafka connector repository here.

To create a CDC handler, specify the following configuration information:

  • Full class name of the CDC handler in the change.data.capture.handler property. Once this configuration setting is specified, the connector will run in CDC operation mode.
  • Topics on which the connector should listen for data in the topics property.
  • MongoDB collection to write data to in the collection property.

The following sample JSON payload instantiates a new connector using Debezium with a specified CDC configuration when posted to the Kafka Connect REST endpoint:

{
"name": "mongo-sink-debezium-cdc",
"config": {
"connection.uri": "mongodb://<mongodb-hostname>:27017/kafkaconnect?w=1&journal=true",
"topics": "myreplset.kafkaconnect.mongosrc",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
"connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
"collection": "mongosink"
}
}
Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

The MongoDB Kafka sink connector can also process event streams using Qlik Replicate as an event producer for several of data sources including:

  • Oracle
  • Postgres
  • Microsoft SQL Server

For a complete list of supported sources for Qlik Replicate CDC events, see the Qlik Replicate Source Endpoint Support Matrix.

You can configure the sink connector to process data from a CDC stream using the included handler for Qlik Replicate or a custom handler that extends the abstract class CdcHandler. The included handler for Qlik Replicate can be found in the MongoDB Kafka connector repository here.

To create a CDC handler, specify the following configuration information:

  • Full class name of the CDC handler in the change.data.capture.handler property. Once this configuration setting is specified, the connector will run in CDC operation mode.
  • Topics on which the connector should listen for data in the topics property.
  • MongoDB collection to write data to in the collection property.

The following sample JSON payload instantiates a new connector using Qlik Replicate with a specified CDC configuration when posted to the Kafka Connect REST endpoint:

{
"name": "mongo-sink-qlik-replicate-cdc",
"config": {
"connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
"connection.uri": "mongodb://<mongodb-hostname>:27017/kafkaconnect?w=1&journal=true",
"collection": "mongosink",
"topics": "myreplset.kafkaconnect.rdbmssrc",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Give Feedback