Navigation

Migrating Existing Collections to a Times Series Collection

On this page

  • Example
  • Source Configuration
  • Sink Configuration
  • Output

In this guide, you can learn how to convert an existing MongoDB collection to a time-series collection using the MongoDB Connector for Apache Kafka.

The example on this page uses an existing MongoDB collection where each document represents a stock data with the following format:

{ tx_time: (STRING), company_symbol: (STRING), company_name: (STRING), price: (DECIMAL) }
Important

A time-series collection requires a DateTime field. In our example, we convert the tx_time field from a String to a Date.

We need to copy existing data from the source by setting the copy.existing parameter to true in the Source Connector. This creates insert events for all existing documents. If we insert documents to a collection during the copying process, the connector inserts them after the copying process is complete. We can view the logs for the following message to see when the process finishes:

Finished copying existing data from the collection(s).

The following shows how to set up the source configuration:

{
"name": "mongo-source-stockdata",
"config": {
"tasks.max":"1",
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": true,
"connection.uri":(MONGODB SOURCE CONNECTION STRING),
"topic.prefix":"stockdata",
"database":"Stocks",
"collection":"StockData",
"copy.existing":"true"
}}

Since tx_time is a String, we could use a Single Message Transform (SMT) to convert the String into a Date at the Sink. However, SMTs like Timestampconverter require us to define a schema for the data in the Kafka topic. This adds some complexity to the configuration.

Instead of using an SMT, we can tell the connector to perform the conversion automatically using the timeseries.timefield.auto.convert and timeseries.timefield.auto.convert.date.format settings.

The following sink configuration converts the tx_time field with the string format yyyy-MM-ddTHH:mm:ssZ into a Date:

{
"name": "mongo-sink-stockdata",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"topics":"stockdata.Stocks.StockData",
"connection.uri":(MONGODB SINK CONNECTION STRING),
"database":"Stocks",
"collection":"StockDataMigrate",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield":"tx_time",
"timeseries.timefield.auto.convert":"true",
"timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
}}

After configuring both the sink and source connectors as shown above, the tx_time field is now an ISODate type.

The following document shows the resulting format of the documents in the collection:

{
tx_time: 2021-07-12T20:05:35.000Z,
company_symbol: 'WSV',
company_name: 'WORRIED SNAKEBITE VENTURES',
price: 21.22,
_id: ObjectId("60eca08f83736ba32c079324")
}
Give Feedback

On this page

  • Example
  • Source Configuration
  • Sink Configuration
  • Output