Navigation
This version of the documentation is archived and no longer supported.

Aggregation

Use MongoDB’s aggregation pipeline to apply filtering rules and perform aggregation operations when reading data from MongoDB into Spark.

Consider a collection named fruit that contains the following documents:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Add a pipeline argument to read.df() from within the sparkR shell to specify an aggregation pipeline to use when creating a DataFrame.

agg_pipeline <- "{'$match': {'type': 'apple'}}"
df <- read.df("", source = "com.mongodb.spark.sql.DefaultSource", pipeline = agg_pipeline)
head(df)

Note

The empty argument (“”) refers to a file to use as a data source. In this case our data source is a MongoDB collection, so the data source argument is empty.

In the sparkR shell, the operation prints the following output:

  _id qty  type
1   1   5 apple