Navigation

Kafka Sink Change Data Capture

Change data capture (CDC) is an architecture that converts changes in a database into event streams. The MongoDB Kafka sink connector can process event streams using Debezium as an event producer for the following source databases:

You can configure the sink connector to process data from a CDC stream using one of the included handlers for Debezium or a custom handler that extends the abstract class CdcHandler.

To create a CDC handler, specify the following configuration information:

  • Full class name of the CDC handler in the change.data.capture.handler property. Once this configuration setting is specified, the connector will run in CDC operation mode.
  • Topics on which the connector should listen for data in the topics property.
  • MongoDB collection to write data to in the collection property.
  • The converters required to handle data formats in the [key|value].converter properties. Both JSON + Schema and AVRO formats are supported.
  • Any post processors necessary to modify the record before saving it to MongoDB.

The following sample JSON payload instantiates a new connector with a specified CDC configuration when posted to the Kafka Connect REST endpoint:

{
"name": "mongo-sink-debezium-cdc",
"config": {
"connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
"topics": "myreplset.kafkaconnect.mongosrc",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
"connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"collection": "mongosink"
}
}
Important With Circle IconCreated with Sketch.Important
Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Give Feedback