Navigation
This version of the documentation is archived and no longer supported.

Spark SQL

Source Code

For the source code that contains the examples below, see SparkSQL.scala.

The following tutorial uses the Spark Shell. For details on using the Spark Shell with the MongoDB Spark Connector, including specifying the database and collection to read and write, see Spark shell.

Prerequisites

  • Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.

  • Running MongoDB instance (version 2.6 or later).

  • Spark 1.6.x.

  • Scala 2.10.x if using the mongo-spark-connector_2.10 package

  • Scala 2.11.x if using the mongo-spark-connector_2.11 package

  • Sample documents. Insert the following documents into a collection.

    You can call saveToMongoDB() with a WriteConfig object to specify a different database and collection. See RDD Save Helper Methods for an example.

    import com.mongodb.spark._
    import org.bson.Document
    
    val docs = """
      {"name": "Bilbo Baggins", "age": 50}
      {"name": "Gandalf", "age": 1000}
      {"name": "Thorin", "age": 195}
      {"name": "Balin", "age": 178}
      {"name": "Kíli", "age": 77}
      {"name": "Dwalin", "age": 169}
      {"name": "Óin", "age": 167}
      {"name": "Glóin", "age": 158}
      {"name": "Fíli", "age": 82}
      {"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
    sc.parallelize(docs.map(Document.parse)).saveToMongoDB()
    
  • Import the MongoDB Connector functions and implicits.

    To enable MongoDB Connector specific functions and implicits for the SparkContext and RDD (Resilient Distributed Dataset), specify the following import statement in the Spark shell:

    import com.mongodb.spark._
    

    To enable MongoDB Connector specific functions and implicits for the SQLContext, specify the following import statement:

    import com.mongodb.spark.sql._
    

Spark SQL

To create a basic SQLContext, you need a SparkContext.

For example:

import org.apache.spark.sql.SQLContext

val sqlContext = SQLContext.getOrCreate(sc)  // sc is an existing SparkContext

DataFrames and Datasets

The Mongo Spark Connector provides the com.mongodb.spark.sql.DefaultSource class that creates DataFrames and Datasets from MongoDB. However, to facilitate the creation of a DataFrame, the connector provides the MongoSpark helper load(sqlContext). MongoSpark.load(sqlContext) is shorthand for configuring and loading via the DataFrameReader.

For example, the following creates a DataFrame using MongoSpark.load(sqlContext) and prints the schema:

val df = MongoSpark.load(sqlContext)  // Uses the SparkConf
df.printSchema()

The operation prints the following:

Note

By default, reading from MongoDB in a SQLContext infers the schema by sampling documents from the database. To explicitly declare a schema, see Explicitly Declare a Schema.

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

Alternatively, you can use SQLContext methods to create DataFrames:

val df2 = sqlContext.loadFromMongoDB() // Uses the SparkConf for configuration
val df3 = sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig

val df4 = sqlContext.read.mongo()
sqlContext.read.format("com.mongodb.spark.sql").load()

// Set custom options
import com.mongodb.spark.config._

val customReadConfig = ReadConfig(Map("readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val df5 = sqlContext.read.mongo(customReadConfig)

val df6 = sqlContext.read.format("com.mongodb.spark.sql").options(customReadConfig.asOptions).load()

Filters

Note

When using filters with DataFrames or Spark SQL, the underlying Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before sending it to Spark.

The following example filters and output the characters with ages under 100:

df.filter(df("age") < 100).show()

In the Spark Shell, the operation prints the following output:

+--------------------+---+-------------+
|                 _id|age|         name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82|         Fíli|
|[5755d7b4566878c9...| 77|         Kíli|
+--------------------+---+-------------+

Explicitly Declare a Schema

By default, reading from MongoDB in a SQLContext infers the schema by sampling documents from the collection. You can also use a case class to define the schema explicitly, thus removing the extra queries needed for sampling.

Note

If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.

Define Schema

The following example creates Character case class and then uses it to define the schema for the DataFrame:

case class Character(name: String, age: Int)
val explicitDF = MongoSpark.load[Character](sqlContext)
explicitDF.printSchema()

In the Spark Shell, the operation prints the following output:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

Convert to DataSet

You can use the case class when converting the DataFrame to a Dataset as in the following example:

val dataset = explicitDF.as[Character]

Convert RDD to DataFrame and Dataset

The MongoRDD class provides helpers to create DataFrames and Datasets directly:

val rdd = MongoSpark.load(sc)
val dfInferredSchema = rdd.toDF()
val dfExplicitSchema = rdd.toDF[Character]()
val ds = rdd.toDS[Character]()

SQL Queries

Spark SQL works on top of DataFrames. To use SQL, you need to register a temporary table first, and then you can run SQL queries over the data.

The following example registers a characters table and then queries it to find all characters that are 100 or older:

Important

When querying the temporary table, you must use the same SQLContext that registers the table.

val characters = MongoSpark.load(sqlContext).toDF[Character]()
characters.registerTempTable("characters")

val centenarians = sqlContext.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

Save DataFrames to MongoDB

The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.

The following example uses MongoSpark.save(DataFrameWriter) method to save the centenarians into the hundredClub collection in MongoDB and to verify the save, reads from the hundredClub collection:

MongoSpark.save(centenarians.write.option("collection", "hundredClub").mode("overwrite"))

println("Reading from the 'hundredClub' collection:")
MongoSpark.load[Character](sqlContext, ReadConfig(Map("collection" -> "hundredClub"), Some(ReadConfig(sqlContext)))).show()

The DataFrameWriter includes the .mode("overwrite") to overwrite the hundredClub collection if the collection already exists.

In the Spark Shell, the operation prints the following output:

+-------+----+
|   name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
|  Balin| 178|
| Dwalin| 169|
|    Óin| 167|
|  Glóin| 158|
+-------+----+

MongoSpark.save(dataFrameWriter) is shorthand for configuring and saving via the DataFrameWriter. The following examples write DataFrames to MongoDB using the DataFrameWriter directly:

centenarians.write.option("collection", "hundredClub").mode("overwrite").mongo()
centenarians.write.option("collection", "hundredClub").mode("overwrite").format("com.mongodb.spark.sql").save()

DataTypes

Spark supports a limited number of data types to ensure that all BSON types can be round tripped in and out of Spark DataFrames/Datasets. For any unsupported Bson Types, custom StructTypes are created.

The following table shows the mapping between the Bson Types and Spark Types:

Bson Type Spark Type
Document StructType
Array ArrayType
32-bit integer Integer
64-bit integer Long
Binary data Array[Byte] or StructType: { subType: Byte, data: Array[Byte]}
Boolean Boolean
Date java.sql.Timestamp
DBPointer StructType: { ref: String , oid: String}
Double Double
JavaScript StructType: { code: String }
JavaScript with scope StructType: { code: String , scope: String }
Max key StructType: { maxKey: Integer }
Min key StructType: { minKey: Integer }
Null null
ObjectId StructType: { oid: String }
Regular Expression StructType: { regex: String , options: String }
String String
Symbol StructType: { symbol: String }
Timestamp StructType: { time: Integer , inc: Integer }
Undefined StructType: { undefined: Boolean }

Dataset support

To help better support Datasets, the following Scala case classes ( com.mongodb.spark.sql.fieldTypes) and JavaBean classes ( com.mongodb.spark.sql.fieldTypes.api.java.) have been created to represent the unsupported BSON Types:

Bson Type Scala case class JavaBean
Binary data Binary Binary
DBPointer DBPointer DBPointer
JavaScript JavaScript JavaScript
JavaScript with scope JavaScriptWithScope JavaScriptWithScope
Max key MaxKey MaxKey
Min key MinKey MinKey
ObjectId ObjectId ObjectId
Regular Expression RegularExpression RegularExpression
Symbol Symbol Symbol
Timestamp Timestamp Timestamp
Undefined Undefined Undefined

For convenience, all BSON Types can be represented as a String value as well. However, these values lose all their original type information and, if saved back to MongoDB, they would be stored as a String.