Navigation

Kafka Sink Connector Data Formats

Supported Data Formats

The MongoDB Kafka Sink Connector converter setting specifies the deserialization method for data it reads from a topic. The converter can deserialize the following data formats:

Format Name Description
AVRO An open source serialization system that provides a compact binary format and a JSON-like API. Integrates with the Confluent Schema Registry to manage schema definitions.
JSON with Schema JSON record structure with explicit schema information to ensure the data matches the expected format.
JSON (plain) JSON record structure without an attached schema.
RAW JSON Serialized as a String. The JSON structure is not managed by Kafka Connect.

Note

Even when specifying the StringConverter format, the RAW JSON mode fields must contain valid JSON to be parsed by the connector correctly.

For more information on Kafka data serialization, see Kafka Connect Serialization Explained.

Configuration Example for AVRO

The following configuration provides example settings that use the AVRO data format with a Schema Registry:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

For more information on using a Schema Registry, see Schema Management.

Configuration Example for JSON with Schema

The following configuration provides example settings that use the JSON with schema data format. The Kafka topic data must be in JSON format and contain top-level objects schema and payload.

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Configuration Example for JSON without Schema

The following configuration provides example settings that use the JSON without schema data format. The Kafka topic data must be in JSON format.

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Choose the appropriate data format

When you specify JSON without Schema, any JSON schema objects such as schema or payload are read explicitly rather than as a validation schema.

Configuration Example for RAW JSON

The following configuration provides example settings that use the RAW JSON data format.

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

Supported Sink Record Structure

Once the converter has deserialized the data from the Kafka topic, Kafka Connect creates a SinkRecord object.

The MongoDB Kafka Connector converts the SinkRecord into a SinkDocument which contains the key and value in BSON format. The convertor determines the types using schema, if provided.

The connector supports all the core schema types listed in Schema.Type:

  • Array
  • Boolean
  • Bytes
  • Float32
  • Float64
  • Int16
  • INT32
  • INT64
  • INT8
  • MAP
  • STRING
  • STRUCT

The MongoDB Kafka Connector also supports the following AVRO logical types:

  • Decimal
  • Date
  • Time (millis/micros)
  • Timestamp (millis/micros)

For a sample AVRO schema that uses logical types, see AVRO Logical Type Example.

Nested Field Schema Example

The converter handles schemas with nested key or value structures. The following is an example AVRO schema with nested fields:

{
  "type": "record",
  "name": "Customer",
  "namespace": "com.mongodb.kafka.data.kafka.avro",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    },
    {
      "name": "active",
      "type": "boolean"
    },
    {
      "name": "address",
      "type": {
        "type": "record",
        "name": "AddressRecord",
        "fields": [
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "food",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "data",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "Whatever",
          "fields": [
            {
              "name": "k",
              "type": "string"
            },
            {
              "name": "v",
              "type": "int"
            }
          ]
        }
      }
    },
    {
      "name": "lut",
      "type": {
        "type": "map",
        "values": "double"
      }
    },
    {
      "name": "raw",
      "type": "bytes"
    }
  ]
}

AVRO Logical Type Example

The following AVRO schema demonstrates use of each supported logical types (Decimal, Date, Time, and Timestamp):

{
  "type": "record",
  "name": "MyLogicalTypesRecord",
  "namespace": "com.mongodb.kafka.data.kafka.avro",
    "fields": [
      {
        "name": "myDecimalField",
        "type": {
          "type": "bytes",
          "logicalType": "decimal",
          "connect.parameters": {
            "scale": "2"
          }
        }
      },
      {
        "name": "myDateField",
        "type": {
          "type": "int",
          "logicalType": "date"
        }
      },
      {
        "name": "myTimeMillisField",
        "type": {
          "type": "int",
          "logicalType": "time-millis"
        }
      },
      {
        "name": "myTimeMicrosField",
        "type": {
          "type": "long",
          "logicalType": "time-micros"
        }
      },
      {
        "name": "myTimestampMillisField",
        "type": {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      },
      {
        "name": "myTimestampMicrosField",
        "type": {
          "type": "long",
          "logicalType": "timestamp-micros"
        }
      }
    ]
  }

Note

If you compile your schema using AVRO code generation for your Kafka producer application, your logical types are mapped to the following non-standard Java classes:

Schema Type Java Type
date org.joda.time.LocalDate
time-millis org.joda.time.LocalTime
time-micros long
datetime-millis org.joda.time.DateTime
datetime-micros long