Docs Menu

Migrate an Existing Collection to a Time Series Collection

In this tutorial, you can learn how to convert an existing MongoDB collection to a time series collection using the MongoDB Kafka Connector.

Time series collections efficiently store sequences of measurements over a period of time. Time series data consists of measurement data collected over time, metadata that describes the measurement, and the time of the measurement.

You can configure the source connector to read your existing MongoDB collection and the sink connector to write Kafka topic data into a MongoDB time series collection.

To learn more about MongoDB time series collections, see the MongoDB manual page on Time Series Collections.

Suppose you accumulated stock price data in a MongoDB collection and have the following needs:

  • More efficient storage of the price data
  • Maintain the ability to analyze stock performance over time using aggregation operators

After reading about MongoDB time series collections, you decide to migrate your existing collection into a time series one. Learn how to perform this migration in the following sections.

To migrate an existing MongoDB collection to a time series collection, you need to perform the following tasks:

  1. Identify the time field in the existing stock price data document.
  2. Configure a source connector to copy the existing collection data to a Kafka topic.
  3. Configure a sink connector to copy the Kafka topic data to the time series collection.
  4. Verify the connector migrated the data to the time series collection.

Before you create a time series collection, you need to identify the time field. The time field is the document field that MongoDB uses to distinguish the time of the measurement. The value of this field can be a string, integer, or ISO date. Make sure to set the timeseries.timefield.auto.convert setting to instruct the connector to automatically convert the value to a date.

The following document shows the format of stock price data documents in the existing MongoDB collection:

{
tx_time: 2021-07-12T05:20:35Z,
symbol: 'WSV',
company_name: 'WORRIED SNAKEBITE VENTURES',
price: 21.22,
_id: ObjectId("...")
}

For this scenario, assume you stored these documents in a collection named PriceData in the Stocks database.

You identify that the tx_time field distinguishes the time of the measurements, and specify it as your time field in your sink connector configuration.

Learn how to set your time field and field conversion in the time series configuration guide.

To copy data from the PriceData MongoDB collection data and publish it to the marketdata.Stocks.PriceData Kafka topic, create a source connector with the following configuration:

{
"name": "mongo-source-marketdata",
"config": {
"tasks.max":"1",
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only":"true",
"connection.uri":"<your connection uri>",
"topic.prefix":"marketdata",
"database":"Stocks",
"collection":"PriceData",
"copy.existing":"true"
}}
Note

If you insert documents into a collection during the copying process, the connector inserts them after the process is complete.

After you start your source connector with the preceding configuration, the connector starts the copying process. Once the process is complete, you should see the following message in the log:

Finished copying existing data from the collection(s).

Your data from the PriceData MongoDB collection is now available in the marketdata.Stocks.PriceData Kafka topic.

To consume data from the marketdata.Stocks.PriceData Kafka topic and write it to a time series collection named StockDataMigrate in a database named Stocks, you can create the following source connector configuration:

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"topics":"marketdata.Stocks.PriceData",
"connection.uri":"<your connection uri>",
"database":"Stocks",
"collection":"StockDataMigrate",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield":"tx_time",
"timeseries.timefield.auto.convert":"true",
"timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
}}
Tip

The sink connector configuration above uses the time field date format converter. Alternatively, you can use the TimestampConverter Single Message Transform (SMT) to convert the tx_time field from a String to an ISODate. When using the TimestampConverter SMT, you must define a schema for the data in the Kafka topic.

For information on how to use the TimestampConverter SMT, see the TimestampConverter Confluent documentation.

After your sink connector finishes processing the topic data, the documents in the StockDataMigrate time series collection contain the tx_time field with an ISODate type value.

By this step, your time series collection should contain all the market data from your PriceData collection. The following shows the format of the documents in the StockDataMigrate time series collection:

{
tx_time: 2021-07-12T20:05:35.000Z,
symbol: 'WSV',
company_name: 'WORRIED SNAKEBITE VENTURES',
price: 21.22,
_id: ObjectId("...")
}

To learn how to verify a collection is of type timeseries, see the instructions on how to Check if a Collection is of Type Time Series in the MongoDB manual.

Give Feedback
MongoDB logo
© 2021 MongoDB, Inc.

About

  • Careers
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.