Fix This Page
Navigation

Aggregation

Use MongoDB’s aggregation pipeline to apply filtering rules and perform aggregation operations when reading data from MongoDB into Spark.

Consider a collection named fruit that contains the following documents:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Add the option() method to spark.read() from within the pyspark shell to specify an aggregation pipeline to use when creating a DataFrame.

pipeline = "{'$match': {'type': 'apple'}}"
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
df.show()

In the pyspark shell, the operation prints the following output:

+---+---+-----+
|_id|qty| type|
+---+---+-----+
|1.0|5.0|apple|
+---+---+-----+