Fix This Page
Navigation

Write to MongoDB

When saving RDD data into MongoDB, the data must be convertible to a BSON document. You may need to include a map transformation to convert the data into a Document (or BsonDocument or a DBObject).

The following example creates a 10 document RDD and saves it to the MongoDB collection specified in the SparkConf:

package com.mongodb.spark_examples;

import com.mongodb.spark.MongoSpark;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;

import org.bson.Document;

import static java.util.Arrays.asList;

public final class WriteToMongoDB {

  public static void main(final String[] args) throws InterruptedException {

    SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .getOrCreate();

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    // Create a RDD of 10 documents
    JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
            (new Function<Integer, Document>() {
      public Document call(final Integer i) throws Exception {
          return Document.parse("{test: " + i + "}");
      }
    });

    /*Start Example: Save data from RDD to MongoDB*****************/
    MongoSpark.save(sparkDocuments, writeConfig);
    /*End Example**************************************************/

    jsc.close();

  }

}

Using a WriteConfig

MongoSpark.save() can accept a WriteConfig object which specifies various write configuration settings, such as the collection or the write concern.

For example, the following code saves data to the spark collection with a majority write concern:

package com.mongodb.spark_examples;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;

import org.bson.Document;

import static java.util.Arrays.asList;

import java.util.HashMap;
import java.util.Map;


public final class WriteToMongoDBWriteConfig {

  public static void main(final String[] args) throws InterruptedException {

    SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .getOrCreate();

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    // Create a custom WriteConfig
    Map<String, String> writeOverrides = new HashMap<String, String>();
    writeOverrides.put("collection", "spark");
    writeOverrides.put("writeConcern.w", "majority");
    WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

    // Create a RDD of 10 documents
    JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
         (new Function<Integer, Document>() {
       public Document call(final Integer i) throws Exception {
               return Document.parse("{spark: " + i + "}");
             }
      });

    /*Start Example: Save data from RDD to MongoDB*****************/
    MongoSpark.save(sparkDocuments, writeConfig);
    /*End Example**************************************************/

    jsc.close();

  }

}