Fix This Page
Navigation

Datasets and SQL

Source Code

For the source code that contains the examples below, see SparkSQL.scala.

Getting Started

This tutorial works either as a self-contained Scala application or as individual commands in the Spark Shell.

Insert the following documents to the characters collection:

package com.mongodb

object SparkSQL {

  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession

    /* For Self-Contained Scala Apps: Create the SparkSession
     * CREATED AUTOMATICALLY IN spark-shell */
    val sparkSession = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.characters")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.characters")
      .getOrCreate()

    import com.mongodb.spark._
    import com.mongodb.spark.config._
    import org.bson.Document

    val docs = """
      {"name": "Bilbo Baggins", "age": 50}
      {"name": "Gandalf", "age": 1000}
      {"name": "Thorin", "age": 195}
      {"name": "Balin", "age": 178}
      {"name": "Kíli", "age": 77}
      {"name": "Dwalin", "age": 169}
      {"name": "Óin", "age": 167}
      {"name": "Glóin", "age": 158}
      {"name": "Fíli", "age": 82}
      {"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
    sparkSession.sparkContext.parallelize(docs.map(Document.parse)).saveToMongoDB()

    // Additional operations go here...

    }
}

DataFrames and Datasets

New in Spark 2.0, a DataFrame is represented by a Dataset of Rows and is now an alias of Dataset[Row].

The Mongo Spark Connector provides the com.mongodb.spark.sql.DefaultSource class that creates DataFrames and Datasets from MongoDB. Use the connector’s MongoSpark helper to facilitate the creation of a DataFrame:

val df = MongoSpark.load(sparkSession)  // Uses the SparkSession
df.printSchema()                        // Prints DataFrame schema

The operation prints the following:

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

Note

By default, reading from MongoDB in a SparkSession infers the schema by sampling documents from the database. To explicitly declare a schema, see Explicitly Declare a Schema.

Alternatively, you can use SparkSession methods to create DataFrames:

val df2 = sparkSession.loadFromMongoDB() // SparkSession used for configuration
val df3 = sparkSession.loadFromMongoDB(ReadConfig(
  Map("uri" -> "mongodb://example.com/database.collection")
  )
) // ReadConfig used for configuration

val df4 = sparkSession.read.mongo() // SparkSession used for configuration
sqlContext.read.format("com.mongodb.spark.sql").load()

// Set custom options
import com.mongodb.spark.config._

val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val df5 = sparkSession.read.mongo(customReadConfig)

val df6 = sparkSession.read.format("com.mongodb.spark.sql").options(customReadConfig.asOptions).load()

Filters

Note

When using filters with DataFrames or Spark SQL, the underlying Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark.

The following example filters and output the characters with ages under 100:

df.filter(df("age") < 100).show()

The operation outputs the following:

+--------------------+---+-------------+
|                 _id|age|         name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82|         Fíli|
|[5755d7b4566878c9...| 77|         Kíli|
+--------------------+---+-------------+

Explicitly Declare a Schema

By default, reading from MongoDB in a SparkSession infers the schema by sampling documents from the collection. You can also use a case class to define the schema explicitly, thus removing the extra queries needed for sampling.

Note

If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.

The following statement creates a Character case class and then uses it to define the schema for the DataFrame:

case class Character(name: String, age: Int)

Important

For self-contained Scala applications, the Character class should be defined outside of the method using the class.

val explicitDF = MongoSpark.load[Character](sparkSession)
explicitDF.printSchema()

The operation prints the following output:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

Convert to DataSet

You can use the case class when converting the DataFrame to a Dataset as in the following example:

val dataset = explicitDF.as[Character]

Convert RDD to DataFrame and Dataset

The MongoRDD class provides helpers to convert an RDD to DataFrames and Datasets. The following example passes a SparkContext object to the MongoSpark.load() which returns an RDD, then converts it:

// Passing the SparkContext to load returns a RDD, not DF or DS
val rdd = MongoSpark.load(sparkSession.sparkContext)
val dfInferredSchema = rdd.toDF()
val dfExplicitSchema = rdd.toDF[Character]()
val ds = rdd.toDS[Character]()

SQL Queries

Before running SQL queries on your dataset, you must register a temporary view for the dataset.

The following operation registers a characters table and then queries it to find all characters that are 100 or older:

val characters = MongoSpark.load[Character](sparkSession)
characters.createOrReplaceTempView("characters")

val centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

Save DataFrames to MongoDB

The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.

The following example uses MongoSpark.save(DataFrameWriter) method to save the centenarians into the hundredClub collection in MongoDB and to verify the save, reads from the hundredClub collection:

MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))

println("Reading from the 'hundredClub' collection:")
MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" -> "hundredClub"), Some(ReadConfig(sparkSession)))).show()

The DataFrameWriter includes the .mode("overwrite") to drop the hundredClub collection before writing the results, if the collection already exists.

In the Spark Shell, the operation prints the following output:

+-------+----+
|   name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
|  Balin| 178|
| Dwalin| 169|
|    Óin| 167|
|  Glóin| 158|
+-------+----+

MongoSpark.save(dataFrameWriter) is shorthand for configuring and saving via the DataFrameWriter. The following examples write DataFrames to MongoDB using the DataFrameWriter directly:

centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo()
centenarians.write.option("collection", "hundredClub").mode("overwrite").format("com.mongodb.spark.sql").save()

DataTypes

Spark supports a limited number of data types to ensure that all BSON types can be round tripped in and out of Spark DataFrames/Datasets. For any unsupported Bson Types, custom StructTypes are created.

The following table shows the mapping between the Bson Types and Spark Types:

Bson Type Spark Type
Document StructType
Array ArrayType
32-bit integer Integer
64-bit integer Long
Binary data Array[Byte] or StructType: { subType: Byte, data: Array[Byte]}
Boolean Boolean
Date java.sql.Timestamp
DBPointer StructType: { ref: String , oid: String}
Double Double
JavaScript StructType: { code: String }
JavaScript with scope StructType: { code: String , scope: String }
Max key StructType: { maxKey: Integer }
Min key StructType: { minKey: Integer }
Null null
ObjectId StructType: { oid: String }
Regular Expression StructType: { regex: String , options: String }
String String
Symbol StructType: { symbol: String }
Timestamp StructType: { time: Integer , inc: Integer }
Undefined StructType: { undefined: Boolean }

Dataset support

To help better support Datasets, the following Scala case classes ( com.mongodb.spark.sql.fieldTypes) and JavaBean classes ( com.mongodb.spark.sql.fieldTypes.api.java.) have been created to represent the unsupported BSON Types:

Bson Type Scala case class JavaBean
Binary data Binary Binary
DBPointer DBPointer DBPointer
JavaScript JavaScript JavaScript
JavaScript with scope JavaScriptWithScope JavaScriptWithScope
Max key MaxKey MaxKey
Min key MinKey MinKey
ObjectId ObjectId ObjectId
Regular Expression RegularExpression RegularExpression
Symbol Symbol Symbol
Timestamp Timestamp Timestamp
Undefined Undefined Undefined

For convenience, all BSON Types can be represented as a String value as well. However, these values lose all their original type information and, if saved back to MongoDB, are stored as a Strings.