Getting Started¶
Source Code
For the source code that contains the examples below, see Introduction.scala.
Prerequisites¶
- Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
- Running MongoDB instance (version 2.6 or later).
- Spark 1.6.x.
- Scala 2.10.x if using the
mongo-spark-connector_2.10
package - Scala 2.11.x if using the
mongo-spark-connector_2.11
package
Spark shell¶
This tutorial uses the Spark shell. When starting the Spark shell, you can specify:
the
--packages
option to download the MongoDB Spark Connector package. The following packages are available:mongo-spark-connector_2.10
for use with Scala 2.10.xmongo-spark-connector_2.11
for use with Scala 2.11.x
the
--conf
option to configure the MongoDB Spark Connnector. These settings configure theSparkConf
object.Note
When specifying the Connector configuration via
SparkConf
, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuration Options.
For example,
- The spark.mongodb.input.uri specifies the
MongoDB server address(
127.0.0.1
), the database to connect (test
), and the collection (myCollection
) from which to read data, and the read preference. - The spark.mongodb.output.uri specifies the
MongoDB server address(
127.0.0.1
), the database to connect (test
), and the collection (myCollection
) to which to write data.
The examples in this tutorial will use this database and collection.
Tips¶
If you get a java.net.BindException: Can't assign requested address
,
Check to ensure that you do not have another Spark shell already running.
Try setting the
SPARK_LOCAL_IP
environment variable; e.g.Try including the following option when starting the Spark shell:
If you have errors running the examples in this tutorial, you may need
to clear your local ivy cache (~/.ivy2/cache/org.mongodb.spark
and
~/.ivy2/jars
).
Import the MongoDB Connector Package¶
To enable MongoDB Connector specific functions and implicits for the
SparkContext
and RDD (Resilient Distributed Dataset), specify
the following import statement in the Spark shell:
Connect to MongoDB¶
Connection to MongoDB happens automatically when an RDD action requires a read from MongoDB or a write to MongoDB.
Considerations for Saving Data from RDD to MongoDB¶
BSON Document¶
When saving RDD data into MongoDB, the data must be convertible to
a BSON document. You may need to include a
map
transformation to convert the data into a Document
(or
BsonDocument
or a DBObject
).
Unsupported Types¶
Some Scala types (e.g. Lists
) are unsupported and should be
converted to their Java equivalent. To convert from Scala into native
types include the following import statement to use the .asJava
method:
Write to MongoDB¶
MongoSpark.save()
¶
To write to MongoDB from RDD, you can use the MongoSpark.save()
method.
For example, the following code saves 10 documents to the collection
specified in the SparkConf
; i.e. myCollection
in the test
database as specified in the spark.mongodb.output.uri setting when
starting the Spark shell:
To specify a different collection, database, and other write
configuration settings, pass a WriteConfig
to
MongoSpark.save()
.
Specify WriteConfig
¶
MongoSpark.save()
can accept a WriteConfig
object which
specifies various write configuration settings, such as the collection or the write concern.
For example, the following code saves data to the spark
collection
with a majority
write concern:
RDD Save Helper Methods¶
RDD has implicit helper method saveToMongoDB()
to write data to
MongoDB:
For example, the following uses the documents
RDD defined above and
uses its saveToMongoDB()
method without any arguments to save the
documents to the collection specified in the SparkConf
:
Call saveToMongoDB()
with a WriteConfig
object to specify a
different MongoDB server address, database and collection. See
write configuration settings for available
settings:
Read and Analyze Data from MongoDB¶
MongoSpark.load()
¶
Use the MongoSpark.load
method to create an RDD representing
a collection.
For example, the following code loads the collection specified in the
SparkConf
; i.e. myCollection
in the test
database as
specified in the spark.mongodb.input.uri setting when starting the
Spark shell:
To specify a different collection, database, and other read
configuration settings, pass a ReadConfig
to
MongoSpark.load()
.
Specify ReadConfig
¶
MongoSpark.load()
can accept a ReadConfig
object which
specifies various read configuration settings, such as the collection or the read preference.
The following example reads from the spark
collection with a
secondaryPreferred
ReadPreference:
SparkContext Load Helper Methods¶
SparkContext
has implicit helper method loadFromMongoDB()
to
load data from MongoDB.
For example, use the loadFromMongoDB()
method without any arguments
to load the collection specified in the SparkConf
:
Call loadFromMongoDB()
with a ReadConfig
object to specify a
different MongoDB server address, database and collection. See
input configuration settings for available
settings:
Aggregation¶
In certain situations, using an aggregation pipeline may be more performant than the direct use of filters.
Filtering data may seem a simple RDD transformation but can be
imperformant. The following example uses the rdd
defined above and
filters for all documents where the “test” field has a value greater
than 5:
A MongoRDD
instance can be passed an aggregation pipeline which allows a user to filter data in
MongoDB and then pass only the matching documents to Spark.
For example, the following code uses the aggregation pipeline to perform the same filter operation; namely, to filter all documents where the “test” field has a value greater than 5; however, only those matching documents are passed across the wire to Spark .
You can specify an valid aggregation pipeline.
The use of an aggregation pipeline also provides the benefit of
handling null results whereas the filter
method does not. If the
filter does not match any documents, the operation throws ERROR
Executor: Exception in task 0.0 in stage 1.0 (TID 8)
java.lang.NullPointerException
.
MongoSpark.builder()
¶
If you require granular control over your configuration, then the
MongoSpark
companion provides a builder()
method for
configuring all aspects of the Mongo Spark Connector. It also provides
methods to create an RDD, DataFrame
or Dataset
.