Navigation
This version of the documentation is archived and no longer supported.

Aggregation

Use MongoDB’s aggregation pipeline to apply filtering rules and perform aggregation operations when reading data from MongoDB into Spark.

Consider a collection named fruit that contains the following documents:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Add the option() method to spark.read() from within the pyspark shell to specify an aggregation pipeline to use when creating a DataFrame.

pipeline = "{'$match': {'type': 'apple'}}"
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
df.show()

In the pyspark shell, the operation prints the following output:

+---+---+-----+
|_id|qty| type|
+---+---+-----+
|1.0|5.0|apple|
+---+---+-----+