Navigation

Run Aggregation Pipelines

Overview

The code snippets on this page demonstrate how configure and run aggregation pipelines against a collection. Aggregation operations run all documents in a collection through a series of data aggregation stages that allow you to filter and shape documents as well as collect summary data about groups of related documents.

Supported Aggregation Stages

MongoDB Realm supports nearly all MongoDB aggregation pipeline stages and operators, but some stages must be executed within a system function. See Aggregation Framework Limitations for more information.

Data Model

The examples on this page use a collection named store.purchases that contains information about historical item sales in an online store. Each document contains a list of the purchased items, including the item name and the purchased quantity, as well as a unique ID value for the customer that purchased the items.

// store.purchases
{
    _id:      <ObjectID>,
    customerId: <ObjectID>,
    items: [ { name: <string>, quantity: <int> } ]
}

Snippet Setup

To use a code snippet in a function, you must first instantiate a MongoDB collection handle:

exports = function() {
  const mongodb = context.services.get("mongodb-atlas");
  const itemsCollection = mongodb.db("store").collection("items");
  const purchasesCollection = mongodb.db("store").collection("purchases");
}

To use a code snippet in an iOS project, you must first do the following:

1

Set Up Your Project

Follow the steps in the Install Realm for iOS, macOS, tvOS, and watchOS guide.

2

Import Realm Dependencies

In scope (e.g. UIViewController)
import RealmCore
import RealmCoreRemoteMongoDBService
import RealmRemoteMongoDBService
3

Initialize the MongoDB Realm iOS SDK

Application Startup (e.g. AppDelegate.didFinishLaunchingWithOptions())
do {
    let _ = try Realm.initializeDefaultAppClient(
        withClientAppID: "YOUR-APP-ID"
    )
} catch {
    print("Failed to initialize MongoDB Realm iOS SDK: \(error)")
}
4

Instantiate a MongoDB Collection Handle

In scope (e.g. UIViewController)
// Variables in scope:
private lazy var realmClient = Realm.defaultAppClient!
private var mongoClient: RemoteMongoClient?
private var itemsCollection: RemoteMongoCollection<Document>?

// Set the realm variables declared above in viewDidLoad()
mongoClient = realmClient.serviceClient(
    fromFactory: remoteMongoClientFactory,
    withName: "mongodb-atlas"
)
itemsCollection = mongoClient?.db("store").collection("items")
purchasesCollection = mongoClient?.db("store").collection("purchases")

To use a code snippet in an Android project, you must first do the following:

1

Set Up Your Project

Follow the steps in the Install Realm for Android guide.

2

Import Realm Dependencies

For CRUD operations on a remote MongoDB collection, you will use one or more of the following import statements:

// Base Realm Packages
import io.realm.Realm
import io.realm.mongodb.App
import io.realm.mongodb.AppConfiguration
// Realm Authentication Packages
import io.realm.mongodb.User
import io.realm.mongodb.Credentials;

// MongoDB Service Packages
import io.realm.mongodb.mongo.MongoClient
import io.realm.mongodb.mongo.MongoDatabase
import io.realm.mongodb.mongo.MongoCollection
// Utility Packages
import org.bson.Document
3

Instantiate a MongoDB Collection Handle

Top of Activity File
private App realmApp;
private MongoClient mongoClient;
private MongoCollection itemsCollection;
private User user;
In Activity.onCreate()
String appID = "<your app ID>"; // replace this with your App ID
Realm.init(this); // initialize Realm, required before interacting with SDK
App app = new App(new AppConfiguration.Builder(appID)
           .build());

// an authenticated user is required to access a MongoDB instance
Credentials credentials = Credentials.anonymous();
app.loginAsync(credentials, it -> {
    if (it.isSuccess()) {
        user = app.currentUser();
        MongoClient mongoClient = user.getMongoClient("<atlas service name>");
        if (mongoClient != null) {
           itemsCollection = mongoClient.getDatabase("<database name>").getCollection("<collection name>");
        } else {
           Log.e(TAG, "Error connecting to the MongoDB instance.");
        }
    else {
       Log.e(TAG, "Error logging into the Realm app. Is anonymous authentication enabled?");
    }
});

To use a code snippet in a JavaScript project, you must first do the following:

1

Import Realm Dependencies

Import from CDN
<!-- Import the Realm JS SDK at the top of the file -->
<script src="https://s3.amazonaws.com/realm-sdks/js/bundles/4/realm.js"></script>
<script>
  // Destructure Realm JS SDK Components
  const { Realm, RemoteMongoClient, BSON } = realm;
</script>

– or –

Import from Module
// Import components of the Realm JS SDK at the top of the file
import {
  Realm,
  RemoteMongoClient,
  BSON
} from "mongodb-realm-browser-sdk";
2

Instantiate a MongoDB Remote Collection Handle

const realmApp = Realm.initializeDefaultAppClient("<Your App ID>");
const mongodb = realmApp.getServiceClient(RemoteMongoClient.factory, "mongodb-atlas");

const itemsCollection = mongodb.db("store").collection("items");
const purchasesCollection = mongodb.db("store").collection("purchases");

Methods

Aggregate Documents in a Collection

You can execute an aggregation pipeline using the collection.aggregate() action.

The following snippet groups all documents in the purchases collection by their customerId value and aggregates a count of the number of items each customer purchases as well as the total number of purchases that they made. After grouping the documents the pipeline adds a new field that calculates the average number of items each customer purchases at a time, averageNumItemsPurchased, to each customer’s document:

const pipeline = [
  { "$group": {
      "_id": "$customerId",
      "numPurchases": { "$sum": 1 },
      "numItemsPurchased": { "$sum": { "$size": "$items" } }
  } },
  { "$addFields": {
      "averageNumItemsPurchased": {
        "$divide": ["$numItemsPurchased", "$numPurchases"]
      }
  } }
]

return purchasesCollection.aggregate(pipeline).toArray()
  .then(customers => {
    console.log(`Successfully grouped purchases for ${customers.length} customers.`)
    for(const customer of customers) {
      console.log(`customer: ${_id}`)
      console.log(`num purchases: ${customer.numPurchases}`)
      console.log(`total items purchased: ${customer.numItemsPurchased}`)
      console.log(`average items per purchase: ${customer.averageNumItemsPurchased}`)
    }
    return customers
  })
  .catch(err => console.error(`Failed to group purchases by customer: ${err}`))
let pipeline : [Document] = [
    [ "$group": [
        "_id": "$customerId",
        "numPurchases": ["$sum": 1] as Document,
        "numItemsPurchased": ["$sum": ["$size": "$items"] as Document] as Document,
    ] as Document] as Document,
    [ "$addFields": [
        "averageNumItemsPurchased": [
            "$divide": ["$totalQuantity", "$count"] as Document
        ] as Document
    ] as Document ] as Document
];

purchasesCollection?.aggregate(pipeline).toArray({ result in
    switch result {
    case .success(let customers):
        print("Successfully grouped purchases for \(customers.count) customers:");
        customers.forEach({ customer in
            print("customer id: \(customer._id)");
            print("num purchases: \(customer.numPurchases)");
            print("total items purchased: \(customer.numItemsPurchased)");
            print("average items purchased: \(customer.averageNumItemsPurchased)");
        })
    case .failure(let error):
        print("Failed to group purchases by customer: \(err)");
    }
})
List<Document> aggregationPipeLine = List.of(
    new Document(
        "$group", new Document()
            .append("_id", "$customerId")
            .append("numPurchases", new Document(
                "$sum", 1
            ))
            .append("numItemsPurchased", new Document(
                "$sum", new Document("$size", "$items")
            ))
    ),
    new Document(
        "$addFields", new Document(
            "averageNumItemsPurchased", new Document(
                "$divide", List.of("$totalQuantity", "$count")
            )
        )
    )
);

purchasesCollection.aggregate(aggregationPipeLine).forEach(item -> {
    Log.d("app", String.format("aggregation result:  %s", item.toString()));
});

// Another way to iterate through
Task<List<Document>> itemsTask = purchasesCollection
    .aggregate(aggregationPipeLine)
    .into(new ArrayList<Document>());
itemsTask.addOnCompleteListener(new OnCompleteListener <List<Document>> () {
    @Override
    public void onComplete(@NonNull Task <List<Document>> task) {
        if (task.isSuccessful()) {
            List<Document> items = task.getResult();
            Log.d("app", String.format("%d aggregation results", items.size()));
            for (Document item: items) {
                Log.d("app", String.format("aggregation result:  %s", item.toString()));
            }
        } else {
            Log.e("app", "failed to perform aggregation with: ", task.getException());
        }
    }
});
const pipeline = [
  { "$group": {
      "_id": "$customerId",
      "numPurchases": { "$sum": 1 },
      "numItemsPurchased": { "$sum": { "$size": "$items" } }
  } },
  { "$addFields": {
      "averageNumItemsPurchased": {
        "$divide": ["$numItemsPurchased", "$numPurchases"]
      }
  } }
]

purchasesCollection.aggregate(pipeline).toArray()
  .then(categories => {
    console.log(`Successfully grouped purchases for ${categories.length} customers.`)
    for(const category of categories) {
      console.log(`category: ${category._id}`)
      console.log(`num purchases: ${category.numPurchases}`)
      console.log(`total items purchased: ${category.numItemsPurchased}`)
      console.log(`average items per purchase: ${category.averageNumItemsPurchased}`)
    }
    return categories
  })
  .catch(err => console.error(`Failed to group purchases by customer: ${err}`))

Aggregation Stages

Filter Documents

You can use the $match stage to filter incoming documents according to a standard query filter.

{
  "$match": {
    "<Field Name>": <Query Expression>,
    ...
  }
}

Example

The following $match stage filters incoming documents to include only those where the graduation_year field has a value between 2019 and 2024, inclusive.

{
  "$match": {
    "graduation_year": {
      "$gte": 2019,
      "$lte": 2024
    },
  }
}

Group Documents

You can use the $group stage to aggregate summary data for groups of one or more documents. MongoDB groups documents based on the _id expression.

Note

You can reference a specific document field by prefixing the field name with a $.

{
  "$group": {
    "_id": <Group By Expression>,
    "<Field Name>": <Aggregation Expression>,
    ...
  }
}

Example

The following $group stage groups documents by the value of their customerId field and calculates the number of purchase documents that each customerId appears in.

{
  "$group": {
    "_id": "$customerId",
    "numPurchases": { "$sum": 1 }
  }
}

Project Document Fields

You can use the $project stage to include or omit specific fields from documents or to calculate new fields using aggregation operators. To include a field, set its value to 1. To omit a field, set its value to 0.

Note

You cannot simultaneously omit and include fields other than _id. If you explicitly include a field other than _id, any fields you did not explicitly include are automatically omitted (and vice-versa).

{
  "$project": {
    "<Field Name>": <0 | 1 | Expression>,
    ...
  }
}

Example

The following $project stage omits the _id field, includes the customerId field, and creates a new field named numItems where the value is the number of documents in the items array:

{
  "$project": {
    "_id": 0,
    "customerId": 1,
    "numItems": { "$sum": { "$size": "$items" } }
  }
}

Add Fields to Documents

You can use the $addFields stage to add new fields with calculated values using aggregation operators.

Note

$addFields is similar to $project but does not allow you to include or omit fields.

Example

The following $addFields stages creates a new field named numItems where the value is the number of documents in the items array:

{
  "$addFields": {
    "numItems": { "$sum": { "$size": "$items" } }
  }
}

Unwind Array Values

You can use the $unwind stage to aggregate individual elements of array fields. When you unwind an array field, MongoDB copies each document once for each element of the array field but replaces the array value with the array element in each copy.

{
  $unwind: {
    path: <Array Field Path>,
    includeArrayIndex: <string>,
    preserveNullAndEmptyArrays: <boolean>
  }
}

Example

The following $unwind stage creates a new document for each element of the items array in each document. It also adds a field called itemIndex to each new document that specifies the element’s position index in the original array:

{
  "$unwind": {
    "path": "$items",
    "includeArrayIndex": "itemIndex"
  }
}

Consider the following document from the purchases collection:

{
  _id: 123,
  customerId: 24601,
  items: [
    { name: "Baseball", quantity: 5 },
    { name: "Baseball Mitt", quantity: 1 },
    { name: "Baseball Bat", quantity: 1 },
  ]
}

If we apply the example $unwind stage to this document, the stage outputs the following three documents:

{
  _id: 123,
  customerId: 24601,
  itemIndex: 0,
  items: { name: "Baseball", quantity: 5 }
}, {
  _id: 123,
  customerId: 24601,
  itemIndex: 1,
  items: { name: "Baseball Mitt", quantity: 1 }
}, {
  _id: 123,
  customerId: 24601,
  itemIndex: 2,
  items: { name: "Baseball Bat", quantity: 1 }
}