Navigation

Kafka Sink Change Data Capture

Change Data Capture Mode

Change data capture (CDC) is an architecture that converts changes in a database into event streams. The MongoDB Kafka sink connector can process event streams using Debezium as an event producer for the following source databases:

CDC Handler Configuration

You can configure the sink connector to process data from a CDC stream using one of the included handlers for Debezium or a custom handler that extends the abstract class CdcHandler.

To create a CDC handler, specify the following configuration information:

  • Full class name of the CDC handler in the change.data.capture.handler property. Once this configuration setting is specified, the connector will run in CDC operation mode.
  • Topics on which the connector should listen for data in the topics property.
  • MongoDB collection to write data to in the collection property.
  • The converters required to handle data formats in the [key|value].converter properties. Both JSON + Schema and AVRO formats are supported.
  • Any post processors necessary to modify the record before saving it to MongoDB.

The following sample JSON payload instantiates a new connector with a specified CDC configuration when posted to the Kafka Connect REST endpoint:

{
  "name": "mongo-sink-debezium-cdc",
  "config": {
    "connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
    "topics": "myreplset.kafkaconnect.mongosrc",
    "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
    "connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "collection": "mongosink"
  }
}