Navigation
This version of the documentation is archived and no longer supported.

Spark Connector Java API

Source Code

For the source code that contains the examples below, see JavaIntroduction.java.

Prerequisites

  • Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
  • Running MongoDB instance (version 2.6 or later).
  • Spark 1.6.x.
  • Scala 2.10.x if using the mongo-spark-connector_2.10 package
  • Scala 2.11.x if using the mongo-spark-connector_2.11 package

The Java API Basics

To facilitate the interaction between MongoDB and Spark, the MongoDB Spark Connector provides the com.mongodb.spark.api.java.MongoSpark helper.

For the configuration classes, use the Java-friendly create methods instead of the native Scala apply methods.

JavaSparkContext

In Java API provides a JavaSparkContext that takes a SparkConf object.

Note

When specifying the Connector configuration via SparkConf, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuration Options.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

// ...

SparkConf sc = new SparkConf()
        .setMaster("local")
        .setAppName("MongoSparkConnectorTour")
        .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
        .set("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection");

JavaSparkContext jsc = new JavaSparkContext(sc); // Create a Java Spark Context
  • The spark.mongodb.input.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
  • The spark.mongodb.output.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data.

The examples in this tutorial use this database and collection.

Write to MongoDB

Note

When saving RDD data into MongoDB, the data must be convertible to a BSON document. You may need to include a map transformation to convert the data into a Document (or BsonDocument or a DBObject).

The following code example saves 10 documents to the MongoDB collection specified in the JavaSparkContext object:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import com.mongodb.spark.MongoSpark;
import org.bson.Document;
import static java.util.Arrays.asList;

//...


JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
        (new Function<Integer, Document>() {
    public Document call(final Integer i) throws Exception {
        return Document.parse("{test: " + i + "}");
    }
});

MongoSpark.save(documents);

Specify a WriteConfig

MongoSpark.save() can accept a WriteConfig object which specifies various write configuration settings, such as the collection or the write concern.

For example, the following code saves data to the spark collection with a majority write concern:

import com.mongodb.spark.config.WriteConfig;

// Saving data with a custom WriteConfig

Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("collection", "spark");
writeOverrides.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
    (new Function<Integer, Document>() {
        public Document call(final Integer i) throws Exception {
            return Document.parse("{spark: " + i + "}");
        }
    });

// Saving data from an RDD to MongoDB
MongoSpark.save(sparkDocuments, writeConfig);

Read Data from MongoDB

You can pass a JavaSparkContext or a SQLContext to the MongoSpark#load for easy reading from MongoDB into an JavaRDD. The following example loads the data we previously saved into the “coll” collection in the “test” database.

// Loading and analyzing data from MongoDB
JavaRDD<Document> rdd = MongoSpark.load(jsc);
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());

Specify a ReadConfig

MongoSpark.load() can accept a ReadConfig object which specifies various read configuration settings, such as the collection or the read preference.

The following example reads from the spark collection with a secondaryPreferred Read Preference:

import com.mongodb.spark.config.ReadConfig;

//...

// Loading data with a custom ReadConfig
Map<String, String> readOverrides = new HashMap<String, String>();
readOverrides.put("collection", "spark");
readOverrides.put("readPreference.name", "secondaryPreferred");
ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);

JavaRDD<Document> customRdd = MongoSpark.load(jsc, readConfig);

System.out.println(customRdd.count());
System.out.println(customRdd.first().toJson());

Aggregation

A JavaMongoRDD instance can be passed an aggregation pipeline which allows a user to filter data in MongoDB and then pass only the matching documents to Spark.

The following example filters all documents where the “test” field has a value greater than 5 but only those matching documents are passed to Spark.

// Filtering an rdd using an aggregation pipeline before passing data to Spark
JavaRDD<Document> aggregatedRdd = rdd.withPipeline(singletonList(Document.parse("{ $match: { test : { $gt : 5 } } }")));
System.out.println(aggregatedRdd.count());
System.out.println(aggregatedRdd.first().toJson());

DataFrames and Datasets

To create a DataFrame from MongoDB data, load the data via DefaultSource or use the JavaRDD.toDF() method.

The following DataFrames example use the collection myNewColl:

// Saving data with a custom WriteConfig

Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("collection", "myNewColl");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

List<String> characters = asList(
    "{'name': 'Bilbo Baggins', 'age': 50}",
    "{'name': 'Gandalf', 'age': 1000}",
    "{'name': 'Thorin', 'age': 195}",
    "{'name': 'Balin', 'age': 178}",
    "{'name': 'Kíli', 'age': 77}",
    "{'name': 'Dwalin', 'age': 169}",
    "{'name': 'Óin', 'age': 167}",
    "{'name': 'Glóin', 'age': 158}",
    "{'name': 'Fíli', 'age': 82}",
    "{'name': 'Bombur'}"
);
MongoSpark.save(jsc.parallelize(characters).map(new Function<String, Document>() {
    public Document call(final String json) throws Exception {
        return Document.parse(json);
    }
}), writeConfig);

For example, the following creates a DataFrame using MongoSpark.load(jsc).toDF() method and prints the schema:

SQLContext sqlContext = SQLContext.getOrCreate(jsc.sc());
DataFrame df = MongoSpark.load(jsc).toDF();
df.printSchema();

The operation prints the following:

Note

By default, reading from MongoDB in a SQLContext infers the schema by sampling documents from the database. To explicitly declare a schema, see Explicitly Declare a Schema.

root
 |-- _id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

Explicitly Declare a Schema

By default, reading from MongoDB in a SQLContext infers the schema by sampling documents from the collection. You can also use a Java bean to define the schema explicitly, thus removing the extra queries needed for sampling.

The following example define Character Java bean and then pass the bean to the toDF() method to define the schema for the DataFrame:

DataFrame explicitDF = MongoSpark.load(sqlContext).toDF(Character.class);
explicitDF.printSchema();

The operation prints the following output:

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

To convert a JavaRDD to a Dataset, the Java API provides the toDS() method.

SQL

SQL can be used to filter data. To use SQL, you need to register a temporary table first, and then you can run SQL queries over the data. The following example registers a temporary table characters, then uses SQL to filter for characters with ages greater than or equal to 100:

explicitDF.registerTempTable("characters");
DataFrame centenarians = sqlContext.sql("SELECT name, age FROM characters WHERE age >= 100");

Important

You must use the same SQLContext object that registers the table and queries it.

Save DataFrames to MongoDB

The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.

In the following example we save the centenarians into the “hundredClub” collection:

The following example saves centenarians into the hundredClub collection in MongoDB and to verify the save, reads from the hundredClub collection:

MongoSpark.write(centenarians).option("collection", "hundredClub").mode("overwrite").save();

// Load the data from the "hundredClub" collection
MongoSpark.load(sqlContext, ReadConfig.create(sqlContext).withOption("collection", "hundredClub"), Character.class).show();

The operation prints the following output:

+-------+----+
|   name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
|  Balin| 178|
| Dwalin| 169|
|    Óin| 167|
|  Glóin| 158|
+-------+----+