Spark Connector Java API¶
Source Code
For the source code that contains the examples below, see JavaIntroduction.java.
Prerequisites¶
- Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
- Running MongoDB instance (version 2.6 or later).
- Spark 1.6.x.
- Scala 2.10.x if using the
mongo-spark-connector_2.10
package - Scala 2.11.x if using the
mongo-spark-connector_2.11
package
The Java API Basics¶
To facilitate the interaction between MongoDB and Spark, the MongoDB
Spark Connector provides the com.mongodb.spark.api.java.MongoSpark
helper.
For the configuration classes, use the Java-friendly create
methods
instead of the native Scala apply
methods.
JavaSparkContext¶
In Java API provides a JavaSparkContext
that takes a SparkConf
object.
Note
When specifying the Connector configuration via SparkConf
, you
must prefix the settings appropriately. For details and other
available MongoDB Spark Connector options, see the
Configuration Options.
- The spark.mongodb.input.uri specifies the MongoDB server
address(
127.0.0.1
), the database to connect (test
), and the collection (myCollection
) from which to read data, and the read preference. - The spark.mongodb.output.uri specifies the MongoDB server
address(
127.0.0.1
), the database to connect (test
), and the collection (myCollection
) to which to write data.
The examples in this tutorial use this database and collection.
Write to MongoDB¶
Note
When saving RDD data into MongoDB, the data must be convertible to
a BSON document. You may need to include a
map
transformation to convert the data into a Document
(or
BsonDocument
or a DBObject
).
The following code example saves 10 documents to the MongoDB collection
specified in the JavaSparkContext
object:
Specify a WriteConfig
¶
MongoSpark.save()
can accept a WriteConfig
object which
specifies various write configuration settings, such as the collection or the write concern.
For example, the following code saves data to the spark
collection
with a majority
write concern:
Read Data from MongoDB¶
You can pass a JavaSparkContext
or a SQLContext
to the
MongoSpark#load
for easy reading from MongoDB into an JavaRDD
.
The following example loads the data we previously saved into the
“coll” collection in the “test” database.
Specify a ReadConfig
¶
MongoSpark.load()
can accept a ReadConfig
object which
specifies various read configuration settings, such as the collection or the read preference.
The following example reads from the spark
collection with a
secondaryPreferred
Read Preference:
Aggregation¶
A JavaMongoRDD
instance can be passed an aggregation
pipeline which allows a user to filter
data in MongoDB and then pass only the matching documents to Spark.
The following example filters all documents where the “test” field has a value greater than 5 but only those matching documents are passed to Spark.
DataFrames and Datasets¶
To create a DataFrame from MongoDB data, load the data via
DefaultSource
or use the JavaRDD.toDF()
method.
The following DataFrames example use the collection myNewColl
:
For example, the following creates a DataFrame using
MongoSpark.load(jsc).toDF()
method and prints the schema:
The operation prints the following:
Note
By default, reading from MongoDB in a SQLContext
infers the
schema by sampling documents from the database. To explicitly
declare a schema, see Explicitly Declare a Schema.
Explicitly Declare a Schema¶
By default, reading from MongoDB in a SQLContext
infers the schema
by sampling documents from the collection. You can also use a Java
bean to define the schema explicitly, thus removing the extra queries
needed for sampling.
The following example define Character
Java bean and then pass the
bean to the toDF()
method to define the schema for the DataFrame:
The operation prints the following output:
To convert a JavaRDD
to a Dataset
, the Java API provides the
toDS()
method.
SQL¶
SQL can be used to filter data. To use SQL, you need to register
a temporary table first, and then you can run SQL queries over the data.
The following example registers a temporary table characters
, then
uses SQL to filter for characters with ages greater than or equal to
100:
Important
You must use the same SQLContext
object that registers the table and
queries it.
Save DataFrames to MongoDB¶
The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.
In the following example we save the centenarians into the “hundredClub” collection:
The following example saves centenarians
into the hundredClub
collection in MongoDB and to verify the save, reads from the
hundredClub
collection:
The operation prints the following output: