Fix This Page


Use MongoDB’s aggregation pipeline to apply filtering rules and perform aggregation operations when reading data from MongoDB into Spark.

Consider a collection named fruit that contains the following documents:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Add a pipeline argument to read.df() from within the sparkR shell to specify an aggregation pipeline to use when creating a DataFrame.

agg_pipeline <- "{'$match': {'type': 'apple'}}"
df <- read.df("", source = "com.mongodb.spark.sql.DefaultSource", pipeline = agg_pipeline)


The empty argument (“”) refers to a file to use as a data source. In this case our data source is a MongoDB collection, so the data source argument is empty.

In the sparkR shell, the operation prints the following output:

  _id qty  type
1   1   5 apple