Navigation

Getting Started

Source Code

For the source code that contains the examples below, see Introduction.scala.

Prerequisites

  • Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
  • Running MongoDB instance (version 2.6 or later).
  • Spark 1.6.x.
  • Scala 2.10.x if using the mongo-spark-connector_2.10 package
  • Scala 2.11.x if using the mongo-spark-connector_2.11 package

Spark shell

This tutorial uses the Spark shell. When starting the Spark shell, you can specify:

  • the --packages option to download the MongoDB Spark Connector package. The following packages are available:

    • mongo-spark-connector_2.10 for use with Scala 2.10.x
    • mongo-spark-connector_2.11 for use with Scala 2.11.x
  • the --conf option to configure the MongoDB Spark Connnector. These settings configure the SparkConf object.

    Note

    When specifying the Connector configuration via SparkConf, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuration Options.

For example,

./bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
                  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \
                  --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
  • The spark.mongodb.input.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
  • The spark.mongodb.output.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data.

The examples in this tutorial will use this database and collection.

Tips

If you get a java.net.BindException: Can't assign requested address,

  • Check to ensure that you do not have another Spark shell already running.

  • Try setting the SPARK_LOCAL_IP environment variable; e.g.

    export SPARK_LOCAL_IP=127.0.0.1
    
  • Try including the following option when starting the Spark shell:

    --driver-java-options "-Djava.net.preferIPv4Stack=true"
    

If you have errors running the examples in this tutorial, you may need to clear your local ivy cache (~/.ivy2/cache/org.mongodb.spark and ~/.ivy2/jars).

Import the MongoDB Connector Package

To enable MongoDB Connector specific functions and implicits for the SparkContext and RDD (Resilient Distributed Dataset), specify the following import statement in the Spark shell:

import com.mongodb.spark._

Connect to MongoDB

Connection to MongoDB happens automatically when an RDD action requires a read from MongoDB or a write to MongoDB.

Considerations for Saving Data from RDD to MongoDB

BSON Document

When saving RDD data into MongoDB, the data must be convertible to a BSON document. You may need to include a map transformation to convert the data into a Document (or BsonDocument or a DBObject).

Unsupported Types

Some Scala types (e.g. Lists) are unsupported and should be converted to their Java equivalent. To convert from Scala into native types include the following import statement to use the .asJava method:

import scala.collection.JavaConverters._

Write to MongoDB

MongoSpark.save()

To write to MongoDB from RDD, you can use the MongoSpark.save() method.

For example, the following code saves 10 documents to the collection specified in the SparkConf; i.e. myCollection in the test database as specified in the spark.mongodb.output.uri setting when starting the Spark shell:

import org.bson.Document

val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))

MongoSpark.save(documents) // Uses the SparkConf for configuration

To specify a different collection, database, and other write configuration settings, pass a WriteConfig to MongoSpark.save().

Specify WriteConfig

MongoSpark.save() can accept a WriteConfig object which specifies various write configuration settings, such as the collection or the write concern.

For example, the following code saves data to the spark collection with a majority write concern:

import com.mongodb.spark.config._

val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc)))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)

RDD Save Helper Methods

RDD has implicit helper method saveToMongoDB() to write data to MongoDB:

For example, the following uses the documents RDD defined above and uses its saveToMongoDB() method without any arguments to save the documents to the collection specified in the SparkConf:

documents.saveToMongoDB() // Uses the SparkConf for configuration

Call saveToMongoDB() with a WriteConfig object to specify a different MongoDB server address, database and collection. See write configuration settings for available settings:

documents.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the WriteConfig

Read and Analyze Data from MongoDB

MongoSpark.load()

Use the MongoSpark.load method to create an RDD representing a collection.

For example, the following code loads the collection specified in the SparkConf; i.e. myCollection in the test database as specified in the spark.mongodb.input.uri setting when starting the Spark shell:

val rdd = MongoSpark.load(sc)
println(rdd.count)
println(rdd.first.toJson)

To specify a different collection, database, and other read configuration settings, pass a ReadConfig to MongoSpark.load().

Specify ReadConfig

MongoSpark.load() can accept a ReadConfig object which specifies various read configuration settings, such as the collection or the read preference.

The following example reads from the spark collection with a secondaryPreferred ReadPreference:

import com.mongodb.spark.config._

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
println(customRdd.first.toJson)

SparkContext Load Helper Methods

SparkContext has implicit helper method loadFromMongoDB() to load data from MongoDB.

For example, use the loadFromMongoDB() method without any arguments to load the collection specified in the SparkConf:

sc.loadFromMongoDB() // Uses the SparkConf for configuration

Call loadFromMongoDB() with a ReadConfig object to specify a different MongoDB server address, database and collection. See input configuration settings for available settings:

sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig

Aggregation

In certain situations, using an aggregation pipeline may be more performant than the direct use of filters.

Filtering data may seem a simple RDD transformation but can be imperformant. The following example uses the rdd defined above and filters for all documents where the “test” field has a value greater than 5:

val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5)
println(filteredRdd.count)
println(filteredRdd.first.toJson)

A MongoRDD instance can be passed an aggregation pipeline which allows a user to filter data in MongoDB and then pass only the matching documents to Spark.

For example, the following code uses the aggregation pipeline to perform the same filter operation; namely, to filter all documents where the “test” field has a value greater than 5; however, only those matching documents are passed across the wire to Spark .

val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }")))
println(aggregatedRdd.count)
println(aggregatedRdd.first.toJson)

You can specify an valid aggregation pipeline.

The use of an aggregation pipeline also provides the benefit of handling null results whereas the filter method does not. If the filter does not match any documents, the operation throws ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8) java.lang.NullPointerException.

MongoSpark.builder()

If you require granular control over your configuration, then the MongoSpark companion provides a builder() method for configuring all aspects of the Mongo Spark Connector. It also provides methods to create an RDD, DataFrame or Dataset.