Navigation

Kafka Sink Connector Data Formats

The MongoDB Kafka Sink Connector converter setting specifies the deserialization method for data it reads from a topic. The converter can deserialize the following data formats:

Format NameDescription
AVROAn open source serialization system that provides a compact binary format and a JSON-like API. Integrates with the Confluent Schema Registry to manage schema definitions.
JSON with SchemaJSON record structure with explicit schema information to ensure the data matches the expected format.
JSON (plain)JSON record structure without an attached schema.
RAW JSONSerialized as a String. The JSON structure is not managed by Kafka Connect.
Info With Circle IconCreated with Sketch.Note

Even when specifying the StringConverter format, the RAW JSON mode fields must contain valid JSON to be parsed by the connector correctly.

For more information on Kafka data serialization, see Kafka Connect Serialization Explained.

The following configuration provides example settings that use the AVRO data format with a Schema Registry:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

For more information on using a Schema Registry, see Schema Management.

The following configuration provides example settings that use the JSON with schema data format. The Kafka topic data must be in JSON format and contain top-level objects schema and payload.

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

The following configuration provides example settings that use the JSON without schema data format. The Kafka topic data must be in JSON format.

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Info With Circle IconCreated with Sketch.Note
Choose the appropriate data format

When you specify JSON without Schema, any JSON schema objects such as schema or payload are read explicitly rather than as a validation schema.

The following configuration provides example settings that use the RAW JSON data format.

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

Once the converter has deserialized the data from the Kafka topic, Kafka Connect creates a SinkRecord object.

The MongoDB Kafka Connector converts the SinkRecord into a SinkDocument which contains the key and value in BSON format. The converter determines the types using schema, if provided.

The connector supports all the core schema types listed in Schema.Type:

  • Array
  • Boolean
  • Bytes
  • Float32
  • Float64
  • Int16
  • INT32
  • INT64
  • INT8
  • MAP
  • STRING
  • STRUCT

The MongoDB Kafka Connector also supports the following AVRO logical types:

  • Decimal
  • Date
  • Time (millis/micros)
  • Timestamp (millis/micros)

For a sample AVRO schema that uses logical types, see AVRO Logical Type Example.

The converter handles schemas with nested key or value structures. The following is an example AVRO schema with nested fields:

{
"type": "record",
"name": "Customer",
"namespace": "com.mongodb.kafka.data.kafka.avro",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "active",
"type": "boolean"
},
{
"name": "address",
"type": {
"type": "record",
"name": "AddressRecord",
"fields": [
{
"name": "city",
"type": "string"
},
{
"name": "country",
"type": "string"
}
]
}
},
{
"name": "food",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "data",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "Whatever",
"fields": [
{
"name": "k",
"type": "string"
},
{
"name": "v",
"type": "int"
}
]
}
}
},
{
"name": "lut",
"type": {
"type": "map",
"values": "double"
}
},
{
"name": "raw",
"type": "bytes"
}
]
}

In the above schema example, the address field is a document with two sub-fields, city and country. It's also possible to specify those fields with dot notation:

{
"name": "address.city",
"type": "string"
},
{
"name": "address.country",
"type": string" }

Dot notation allows you to specify nested fields without using separate lines for the top-level document and its sub-documents.

The following AVRO schema demonstrates use of each supported logical types (Decimal, Date, Time, and Timestamp):

{
"type": "record",
"name": "MyLogicalTypesRecord",
"namespace": "com.mongodb.kafka.data.kafka.avro",
"fields": [
{
"name": "myDecimalField",
"type": {
"type": "bytes",
"logicalType": "decimal",
"connect.parameters": {
"scale": "2"
}
}
},
{
"name": "myDateField",
"type": {
"type": "int",
"logicalType": "date"
}
},
{
"name": "myTimeMillisField",
"type": {
"type": "int",
"logicalType": "time-millis"
}
},
{
"name": "myTimeMicrosField",
"type": {
"type": "long",
"logicalType": "time-micros"
}
},
{
"name": "myTimestampMillisField",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "myTimestampMicrosField",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
}
Info With Circle IconCreated with Sketch.Note

If you compile your schema using AVRO code generation for your Kafka producer application, your logical types are mapped to the following non-standard Java classes:

Schema TypeJava Type
dateorg.joda.time.LocalDate
time-millisorg.joda.time.LocalTime
time-microslong
datetime-millisorg.joda.time.DateTime
datetime-microslong
Give Feedback