Docs Menu

Migrate Existing Collections to a Time Series Collection

On this page

  • Example
  • Source Configuration
  • Sink Configuration
  • Output

This guide shows you how to convert an existing MongoDB collection to a time series collection using the Kafka Connector.

Time series collections efficiently store sequences of measurements over a period of time. Time series data consists of any data collected over time, metadata that describes the measurement, and the time of the measurement. With the Kafka Connector, you can configure Kafka topic data to write directly into a time series collection.

Important

Each time series document must contain a BSON Date type for the field specified by the timeseries.timeField setting. For a full list of the time series configuration properties, see our Kafka Sink Connector Configuration Properties page.

The following example converts the StockData collection in the Stocks database to a time series collection. Each document in the collection currently represents stock data in the following format:

{
tx_time: 2021-07-12T05:20:35Z,
company_symbol: 'WSV',
company_name: 'WORRIED SNAKEBITE VENTURES',
price: 21.22,
_id: ObjectId("60eca08f83736ba32c079324")
}

This example uses the Kafka Sink Connector to convert the tx_time field from a String to an ISODate type, which is a BSON Date type.

The Source Connector creates insert events for all the documents in the source collection and publishes them to a topic.

The following configuration instructs the Source Connector to read from the StockData MongoDB collection and publish the data as insert events on the stockdata.Stocks.StockData Kafka topic:

{
"name": "mongo-source-stockdata",
"config": {
"tasks.max":"1",
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": true,
"connection.uri":(MONGODB SOURCE CONNECTION STRING),
"topic.prefix":"stockdata",
"database":"Stocks",
"collection":"StockData",
"copy.existing":"true"
}}
Note

If you insert documents into a collection during the copying process, the connector inserts them after the copying process is complete.

After you start your Source Connector with the preceding configuration, the connector starts the copying process. Once the process is complete, you should see the following message in the log:

Finished copying existing data from the collection(s).

Your data from the StockData MongoDB collection is now available in the stockdata.Stocks.StockData Kafka topic.

The Sink Connector consumes documents from the Kafka topic, converts the tx_time field from a String to an ISODate type and writes them to a time series collection.

The following configuration instructs the Sink Connector to write data from the stockdata.Stocks.StockData Kafka topic to the StockDataMigrate MongoDB time series collection:

{
"name": "mongo-sink-stockdata",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"topics":"stockdata.Stocks.StockData",
"connection.uri":(MONGODB SINK CONNECTION STRING),
"database":"Stocks",
"collection":"StockDataMigrate",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield":"tx_time",
"timeseries.timefield.auto.convert":"true",
"timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
}}
Tip

Alternatively, you can use a TimestampConverter Single Message Transform (SMT) to convert the tx_time field from a String to an ISODate. When using the TimestampConverter SMT, you must define a schema for the data in the Kafka topic.

For information on how to use the TimestampConverter SMT, see the Confluent Documentation on TimestampConverter.

After configuring the Sink Connector and the data finishes processing, documents in the StockDataMigrate collection contain the tx_time field as an ISODate type.

The following shows the format of the documents in the StockDataMigrate time series collection:

{
tx_time: 2021-07-12T20:05:35.000Z,
company_symbol: 'WSV',
company_name: 'WORRIED SNAKEBITE VENTURES',
price: 21.22,
_id: ObjectId("60eca08f83736ba32c079324")
}
Give Feedback
© 2021 MongoDB, Inc.

About

  • Careers
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.