Navigation

Read from MongoDB

Pass a JavaSparkContext to MongoSpark.load() to read from MongoDB into a JavaMongoRDD. The following example loads the data from the myCollection collection in the test database that was saved as part of the write example.

package com.mongodb.spark_examples;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

public final class ReadFromMongoDB {

  public static void main(final String[] args) throws InterruptedException {

    SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    /*Start Example: Read data from MongoDB************************/
    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
    /*End Example**************************************************/

    // Analyze data from MongoDB
    System.out.println(rdd.count());
    System.out.println(rdd.first().toJson());

    jsc.close();

  }
}

Specify a ReadConfig

MongoSpark.load() can accept a ReadConfig object which specifies various read configuration settings, such as the collection or the Read Preference.

The following example reads from the spark collection with a secondaryPreferred read preference:

package com.mongodb.spark_examples;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.ReadConfig;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;


public final class ReadFromMongoDBReadConfig {

  public static void main(final String[] args) throws InterruptedException {

    SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    /*Start Example: Read data from MongoDB************************/

    // Create a custom ReadConfig
    Map<String, String> readOverrides = new HashMap<String, String>();
    readOverrides.put("collection", "spark");
    readOverrides.put("readPreference.name", "secondaryPreferred");
    ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);

    // Load data using the custom ReadConfig
    JavaMongoRDD<Document> customRdd = MongoSpark.load(jsc, readConfig);

    /*End Example**************************************************/

    // Analyze data from MongoDB
    System.out.println(customRdd.count());
    System.out.println(customRdd.first().toJson());

    jsc.close();

  }
}