Spark SQL¶
Source Code
For the source code that contains the examples below, see SparkSQL.scala.
The following tutorial uses the Spark Shell. For details on using the Spark Shell with the MongoDB Spark Connector, including specifying the database and collection to read and write, see Spark shell.
Prerequisites¶
Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
Running MongoDB instance (version 2.6 or later).
Spark 1.6.x.
Scala 2.10.x if using the
mongo-spark-connector_2.10
packageScala 2.11.x if using the
mongo-spark-connector_2.11
packageSample documents. Insert the following documents into a collection.
You can call
saveToMongoDB()
with aWriteConfig
object to specify a different database and collection. See RDD Save Helper Methods for an example.Import the MongoDB Connector functions and implicits.
To enable MongoDB Connector specific functions and implicits for the
SparkContext
and RDD (Resilient Distributed Dataset), specify the following import statement in the Spark shell:To enable MongoDB Connector specific functions and implicits for the
SQLContext
, specify the following import statement:
DataFrames and Datasets¶
The Mongo Spark Connector provides the
com.mongodb.spark.sql.DefaultSource
class that creates DataFrames
and Datasets from MongoDB. However, to facilitate the creation of a
DataFrame, the connector provides the MongoSpark
helper
load(sqlContext)
. MongoSpark.load(sqlContext)
is shorthand for
configuring and loading via the DataFrameReader.
For example, the following creates a DataFrame using
MongoSpark.load(sqlContext)
and prints the schema:
The operation prints the following:
Note
By default, reading from MongoDB in a SQLContext
infers the
schema by sampling documents from the database. To explicitly
declare a schema, see Explicitly Declare a Schema.
Alternatively, you can use SQLContext
methods to create DataFrames:
Filters¶
Note
When using filters
with DataFrames or Spark SQL, the underlying
Mongo Connector code constructs an aggregation pipeline to filter the data in MongoDB before
sending it to Spark.
The following example filters and output the characters with ages under 100:
In the Spark Shell, the operation prints the following output:
Explicitly Declare a Schema¶
By default, reading from MongoDB in a SQLContext
infers the schema
by sampling documents from the collection. You can also use a case
class to define the schema explicitly, thus removing the extra queries
needed for sampling.
Note
If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.
Define Schema¶
The following example creates Character
case class and then uses it
to define the schema for the DataFrame:
In the Spark Shell, the operation prints the following output:
Convert to DataSet¶
You can use the case class when converting the DataFrame
to a
Dataset
as in the following example:
Convert RDD to DataFrame and Dataset¶
The MongoRDD
class provides helpers to create DataFrames and
Datasets directly:
SQL Queries¶
Spark SQL works on top of DataFrames. To use SQL, you need to register a temporary table first, and then you can run SQL queries over the data.
The following example registers a characters
table and then queries
it to find all characters that are 100 or older:
Important
When querying the temporary table, you must use the same SQLContext that registers the table.
Save DataFrames to MongoDB¶
The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.
The following example uses MongoSpark.save(DataFrameWriter)
method
to save the centenarians
into the hundredClub
collection in
MongoDB and to verify the save, reads from the hundredClub
collection:
The DataFrameWriter includes the .mode("overwrite")
to overwrite
the hundredClub
collection if the collection already exists.
In the Spark Shell, the operation prints the following output:
MongoSpark.save(dataFrameWriter)
is shorthand for configuring and
saving via the DataFrameWriter. The following examples write DataFrames
to MongoDB using the DataFrameWriter directly:
DataTypes¶
Spark supports a limited number of data types to ensure that all BSON types can be round tripped in and out of Spark DataFrames/Datasets. For any unsupported Bson Types, custom StructTypes are created.
The following table shows the mapping between the Bson Types and Spark Types:
Bson Type | Spark Type |
---|---|
Document |
StructType |
Array |
ArrayType |
32-bit integer |
Integer |
64-bit integer |
Long |
Binary data |
Array[Byte] or StructType : { subType: Byte, data: Array[Byte]} |
Boolean |
Boolean |
Date |
java.sql.Timestamp |
DBPointer |
StructType : { ref: String , oid: String} |
Double |
Double |
JavaScript |
StructType : { code: String } |
JavaScript with scope |
StructType : { code: String , scope: String } |
Max key |
StructType : { maxKey: Integer } |
Min key |
StructType : { minKey: Integer } |
Null |
null |
ObjectId |
StructType : { oid: String } |
Regular Expression |
StructType : { regex: String , options: String } |
String |
String |
Symbol |
StructType : { symbol: String } |
Timestamp |
StructType : { time: Integer , inc: Integer } |
Undefined |
StructType : { undefined: Boolean } |
Dataset support¶
To help better support Datasets, the following Scala case classes (
com.mongodb.spark.sql.fieldTypes
) and JavaBean classes (
com.mongodb.spark.sql.fieldTypes.api.java.
) have been created to
represent the unsupported BSON Types:
Bson Type | Scala case class | JavaBean |
---|---|---|
Binary data |
Binary |
Binary |
DBPointer |
DBPointer |
DBPointer |
JavaScript |
JavaScript |
JavaScript |
JavaScript with scope |
JavaScriptWithScope |
JavaScriptWithScope |
Max key |
MaxKey |
MaxKey |
Min key |
MinKey |
MinKey |
ObjectId |
ObjectId |
ObjectId |
Regular Expression |
RegularExpression |
RegularExpression |
Symbol |
Symbol |
Symbol |
Timestamp |
Timestamp |
Timestamp |
Undefined |
Undefined |
Undefined |
For convenience, all BSON Types can be represented as a String value as well. However, these values lose all their original type information and, if saved back to MongoDB, they would be stored as a String.