Fix This Page
Navigation

Filters and Aggregation

Depending on the dataset, filtering data using MongoDB’s aggregation framework may perform more efficiently than the direct use of RDD filters and dataset filters.

The following sections use the myCollection collection in the test database that is configured in the SparkSession:

{ "_id" : 1, "test" : 1 }
{ "_id" : 2, "test" : 2 }
{ "_id" : 3, "test" : 3 }
{ "_id" : 4, "test" : 4 }
{ "_id" : 5, "test" : 5 }
{ "_id" : 6, "test" : 6 }
{ "_id" : 7, "test" : 7 }
{ "_id" : 8, "test" : 8 }
{ "_id" : 9, "test" : 9 }
{ "_id" : 10, "test" : 10 }

Filters

The following example uses the RDD defined above and filters for all documents where the test field has a value greater than 5:

val rdd = MongoSpark.load(sc)

val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5)
println(filteredRdd.count)
println(filteredRdd.first.toJson)

Aggregation

Pass an aggregation pipeline to a MongoRDD instance to filter data and perform aggregations in MongoDB before passing documents to Spark.

The following example uses an aggregation pipeline to perform the same filter operation as the example above; filter all documents where the test field has a value greater than 5:

val rdd = MongoSpark.load(sc)

val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }")))
println(aggregatedRdd.count)
println(aggregatedRdd.first.toJson)

Any valid aggregation pipeline can be specified in the example above.

Aggregation pipelines handle null results whereas the filter methods do not. If the filter does not match any documents, the operation throws the following exception:

ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8) java.lang.NullPointerException