Fix This Page
Navigation

Aggregation

Pass an aggregation pipeline to a JavaMongoRDD instance to filter data and perform aggregations in MongoDB before passing documents to Spark.

The following example uses an aggregation pipeline to perform the same filter operation as the example above; filter all documents where the test field has a value greater than 5:

package com.mongodb.spark_examples;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.bson.Document;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

import static java.util.Collections.singletonList;

public final class Aggregation {

  public static void main(final String[] args) throws InterruptedException {

    SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    // Load and analyze data from MongoDB
    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);

    /*Start Example: Use aggregation to filter a RDD***************/
    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(
      singletonList(
        Document.parse("{ $match: { test : { $gt : 5 } } }")));
    /*End Example**************************************************/

    // Analyze data from MongoDB
    System.out.println(aggregatedRdd.count());
    System.out.println(aggregatedRdd.first().toJson());

    jsc.close();

  }
}