Write to MongoDB¶
When saving RDD data into MongoDB, the data must be convertible to
a BSON document. You may need to include a
map
transformation to convert the data into a Document
(or
BsonDocument
or a DBObject
).
The following example creates a 10 document RDD and saves it to the
MongoDB collection specified in the SparkConf
:
package com.mongodb.spark_examples; import com.mongodb.spark.MongoSpark; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; import org.bson.Document; import static java.util.Arrays.asList; public final class WriteToMongoDB { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Create a RDD of 10 documents JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map (new Function<Integer, Document>() { public Document call(final Integer i) throws Exception { return Document.parse("{test: " + i + "}"); } }); /*Start Example: Save data from RDD to MongoDB*****************/ MongoSpark.save(sparkDocuments, writeConfig); /*End Example**************************************************/ jsc.close(); } }
Using a WriteConfig
¶
MongoSpark.save()
can accept a WriteConfig
object which
specifies various write configuration settings, such as the collection or the write concern.
For example, the following code saves data to the spark
collection
with a majority
write concern:
package com.mongodb.spark_examples; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.WriteConfig; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; import org.bson.Document; import static java.util.Arrays.asList; import java.util.HashMap; import java.util.Map; public final class WriteToMongoDBWriteConfig { public static void main(final String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); // Create a custom WriteConfig Map<String, String> writeOverrides = new HashMap<String, String>(); writeOverrides.put("collection", "spark"); writeOverrides.put("writeConcern.w", "majority"); WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides); // Create a RDD of 10 documents JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map (new Function<Integer, Document>() { public Document call(final Integer i) throws Exception { return Document.parse("{spark: " + i + "}"); } }); /*Start Example: Save data from RDD to MongoDB*****************/ MongoSpark.save(sparkDocuments, writeConfig); /*End Example**************************************************/ jsc.close(); } }
Give Feedback