Aggregation¶
Use MongoDB's aggregation pipeline to apply filtering rules and perform aggregation operations when reading data from MongoDB into Spark.
Consider a collection named fruit
that contains the
following documents:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Add a pipeline
argument to read.df()
from
within the sparkR
shell to specify an aggregation pipeline
to use when creating a DataFrame.
agg_pipeline <- "{'$match': {'type': 'apple'}}" df <- read.df("", source = "com.mongodb.spark.sql.DefaultSource", pipeline = agg_pipeline) head(df)
Note
The empty argument ("") refers to a file to use as a data source. In this case our data source is a MongoDB collection, so the data source argument is empty.
In the sparkR
shell, the operation prints the following output:
_id qty type 1 1 5 apple
Give Feedback