Fix This Page
Navigation

Hierarchical Aggregation

Overview

Background

If you collect a large amount of data, but do not pre-aggregate, and you want to have access to aggregated information and reports, then you need a method to aggregate these data into a usable form. This document provides an overview of these aggregation patterns and processes.

For clarity, this case study assumes that the incoming event data resides in a collection named events. For details on how you might get the event data into the events collection, please see “Storing Log Data” document. This document continues using this example.

Solution

The first step in the aggregation process is to aggregate event data into the finest required granularity. Then use this aggregation to generate the next least specific level granularity and this repeat process until you have generated all required views.

The solution uses several collections: the raw data (i.e. events) collection as well as collections for aggregated hourly, daily, weekly, monthly, and yearly statistics. All aggregations use the mapReduce command, in a hierarchical process. The following figure illustrates the input and output of each job:

Hierarchy

Hierarchy of data aggregation.

Note

Aggregating raw events into an hourly collection is qualitatively different from the operation that aggregates hourly statistics into the daily collection.

See also

map-reduce, mapReduce, and the Map-Reduce Documentation page for more information on the Map-reduce data aggregation paradigm.

Schema

When designing the schema for event storage, it’s important to track the events included in the aggregation and events that are not yet included.

Relational Approach

A simple tactic from relational database, uses an auto-incremented integer as the primary key. However, this introduces a significant performance penalty for event logging process because the aggregation process must fetch new keys one at a time.

If you can batch your inserts into the events collection, you can use an auto-increment primary key by using the find_and_modify command to generate the _id values, as in the following example:

>>> obj = db.my_sequence.find_and_modify(
...     query={'_id':0},
...     update={'$inc': {'inc': 50}}
...     upsert=True,
...     new=True)
>>> batch_of_ids = range(obj['inc']-50, obj['inc'])

However, in most cases you can simply include a timestamp with each event that you can use to distinguish processed events from unprocessed events.

This example assumes that you are calculating average session length for logged-in users on a website. The events will have the following form:

{
    "userid": "rick",
    "ts": ISODate('2010-10-10T14:17:22Z'),
    "length":95
}

The operations described in the next session will calculate total and average session times for each user at the hour, day, week, month and year. For each aggregation you will want to store the number of sessions so that MongoDB can incrementally recompute the average session times. The aggregate document will resemble the following:

{
   _id: { u: "rick", d: ISODate("2010-10-10T14:00:00Z") },
   value: {
       ts: ISODate('2010-10-10T15:01:00Z'),
       total: 254,
       count: 10,
       mean: 25.4 }
}

Note

The timestamp value in the _id sub-document, which will allow you to incrementally update documents at various levels of the hierarchy.

Operations

This section assumes that all events exist in the events collection and have a timestamp. The operations, thus are to aggregate from the events collection into the smallest aggregate–hourly totals– and then aggregate from the hourly totals into coarser granularity levels. In all cases, these operations will store aggregation time as a last_run variable.

Creating Hourly Views from Event Collections

Aggregation

Note

Although this solution uses Python and PyMongo to connect with MongoDB, you must pass JavaScript functions (i.e. mapf, reducef, and finalizef) to the mapReduce command.

Begin by creating a map function, as below:

mapf_hour = bson.Code('''function() {
    var key = {
        u: this.userid,
        d: new Date(
            this.ts.getFullYear(),
            this.ts.getMonth(),
            this.ts.getDate(),
            this.ts.getHours(),
            0, 0, 0);
    emit(
        key,
        {
            total: this.length,
            count: 1,
            mean: 0,
            ts: new Date(); });
}''')

In this case, it emits key-value pairs that contain the data you want to aggregate as you’d expect. The function also emits a ts value that makes it possible to cascade aggregations to coarser grained aggregations (i.e. hour to day, etc.)

Consider the following reduce function:

reducef = bson.Code('''function(key, values) {
    var r = { total: 0, count: 0, mean: 0, ts: null };
    values.forEach(function(v) {
        r.total += v.total;
        r.count += v.count;
    });
    return r;
}''')

The reduce function returns a document in the same format as the output of the map function. This pattern for map and reduce functions makes map-reduce processes easier to test and debug.

While the reduce function ignores the mean and ts (timestamp) values, the finalize step, as follows, computes these data:

finalizef = bson.Code('''function(key, value) {
    if(value.count > 0) {
       value.mean = value.total / value.count;
    }
    value.ts = new Date();
    return value;
}''')

With the above function the map_reduce operation itself will resemble the following:

cutoff = datetime.utcnow() - timedelta(seconds=60)
query = { 'ts': { '$gt': last_run, '$lt': cutoff } }

db.events.map_reduce(
    map=mapf_hour,
    reduce=reducef,
    finalize=finalizef,
    query=query,
    out={ 'reduce': 'stats.hourly' })

last_run = cutoff

The cutoff variable allows you to process all events that have occurred since the last run but before 1 minute ago. This allows for some delay in logging events. You can safely run this aggregation as often as you like, provided that you update the last_run variable each time.

Indexing

Create an index on the timestamp (i.e. the ts field) to support the query selection of the map_reduce operation. Use the following operation at the Python/PyMongo console:

>>> db.events.ensure_index('ts')

Deriving Day-Level Data

Aggregation

To calculate daily statistics, use the hourly statistics as input. Begin with the following map function:

mapf_day = bson.Code('''function() {
    var key = {
        u: this._id.u,
        d: new Date(
            this._id.d.getFullYear(),
            this._id.d.getMonth(),
            this._id.d.getDate(),
            0, 0, 0, 0) };
    emit(
        key,
        {
            total: this.value.total,
            count: this.value.count,
            mean: 0,
            ts: null });
}''')

The map function for deriving day-level data differs from the initial aggregation above in the following ways:

  • the aggregation key is the (userid, date) rather than (userid, hour) to support daily aggregation.

  • the keys and values emitted (i.e. emit()) are actually the total and count values from the hourly aggregates rather than properties from event documents.

    This is the case for all the higher-level aggregation operations.

Because the output of this map function is the same as the previous map function, you can use the same reduce and finalize functions.

The actual code driving this level of aggregation is as follows:

cutoff = datetime.utcnow() - timedelta(seconds=60)
query = { 'value.ts': { '$gt': last_run, '$lt': cutoff } }

db.stats.hourly.map_reduce(
    map=mapf_day,
    reduce=reducef,
    finalize=finalizef,
    query=query,
    out={ 'reduce': 'stats.daily' })

last_run = cutoff

There are a couple of things to note here. First of all, the query is not on ts now, but value.ts, the timestamp written during the finalization of the hourly aggregates. Also note that you are, in fact, aggregating from the stats.hourly collection into the stats.daily collection.

Indexing

Because you will run the query option regularly which finds on the value.ts field, you may wish to create an index to support this. Use the following operation in the Python/PyMongo shell to create this index:

>>> db.stats.hourly.ensure_index('value.ts')

Weekly and Monthly Aggregation

Aggregation

You can use the aggregated day-level data to generate weekly and monthly statistics. A map function for generating weekly data follows:

mapf_week = bson.Code('''function() {
    var key = {
        u: this._id.u,
        d: new Date(
            this._id.d.valueOf()
            - dt.getDay()*24*60*60*1000) };
    emit(
        key,
        {
            total: this.value.total,
            count: this.value.count,
            mean: 0,
            ts: null });
}''')

Here, to get the group key, the function takes the current and subtracts days until you get the beginning of the week. In the weekly map function, you’ll use the first day of the month as the group key, as follows:

mapf_month = bson.Code('''function() {
        d: new Date(
            this._id.d.getFullYear(),
            this._id.d.getMonth(),
            1, 0, 0, 0, 0) };
    emit(
        key,
        {
            total: this.value.total,
            count: this.value.count,
            mean: 0,
            ts: null });
}''')

These map functions are identical to each other except for the date calculation.

Indexing

Create additional indexes to support the weekly and monthly aggregation options on the value.ts field. Use the following operation in the Python/PyMongo shell.

>>> db.stats.daily.ensure_index('value.ts')
>>> db.stats.monthly.ensure_index('value.ts')

Refactor Map Functions

Use Python’s string interpolation to refactor the map function definitions as follows:

mapf_hierarchical = '''function() {
    var key = {
        u: this._id.u,
        d: %s };
    emit(
        key,
        {
            total: this.value.total,
            count: this.value.count,
            mean: 0,
            ts: null });
}'''

mapf_day = bson.Code(
    mapf_hierarchical % '''new Date(
            this._id.d.getFullYear(),
            this._id.d.getMonth(),
            this._id.d.getDate(),
            0, 0, 0, 0)''')

mapf_week = bson.Code(
    mapf_hierarchical % '''new Date(
            this._id.d.valueOf()
            - dt.getDay()*24*60*60*1000)''')

mapf_month = bson.Code(
    mapf_hierarchical % '''new Date(
            this._id.d.getFullYear(),
            this._id.d.getMonth(),
            1, 0, 0, 0, 0)''')

mapf_year = bson.Code(
    mapf_hierarchical % '''new Date(
            this._id.d.getFullYear(),
            0, 1, 0, 0, 0, 0)''')

You can create a h_aggregate function to wrap the map_reduce operation, as below, to reduce code duplication:

def h_aggregate(icollection, ocollection, mapf, cutoff, last_run):
     query = { 'value.ts': { '$gt': last_run, '$lt': cutoff } }
     icollection.map_reduce(
         map=mapf,
         reduce=reducef,
         finalize=finalizef,
         query=query,
         out={ 'reduce': ocollection.name })

With h_aggregate defined, you can perform all aggregation operations as follows:

cutoff = datetime.utcnow() - timedelta(seconds=60)

h_aggregate(db.events, db.stats.hourly, mapf_hour, cutoff, last_run)
h_aggregate(db.stats.hourly, db.stats.daily, mapf_day, cutoff, last_run)
h_aggregate(db.stats.daily, db.stats.weekly, mapf_week, cutoff, last_run)
h_aggregate(db.stats.daily, db.stats.monthly, mapf_month, cutoff, last_run)
h_aggregate(db.stats.monthly, db.stats.yearly, mapf_year, cutoff, last_run)

last_run = cutoff

As long as you save and restore the last_run variable between aggregations, you can run these aggregations as often as you like since each aggregation operation is incremental.

Sharding

Ensure that you choose a shard key that is not the incoming timestamp, but rather something that varies significantly in the most recent documents. In the example above, consider using the userid as the most significant part of the shard key.

To prevent a single, active user from creating a large, chunk that MongoDB cannot split, use a compound shard key with (username, timestamp) on the events collection. Consider the following:

>>> db.command('shardCollection','events', {
... 'key' : { 'userid': 1, 'ts' : 1} } )
{ "collectionsharded": "events", "ok" : 1 }

To shard the aggregated collections you must use the _id field, so you can issue the following group of shard operations in the Python/PyMongo shell:

db.command('shardCollection', 'stats.daily', {
     'key': { '_id': 1 } })
db.command('shardCollection', 'stats.weekly', {
     'key': { '_id': 1 } })
db.command('shardCollection', 'stats.monthly', {
     'key': { '_id': 1 } })
db.command('shardCollection', 'stats.yearly', {
     'key': { '_id': 1 } })

You should also update the h_aggregate map-reduce wrapper to support sharded output Add 'sharded':True to the out argument. See the full sharded h_aggregate function:

def h_aggregate(icollection, ocollection, mapf, cutoff, last_run):
     query = { 'value.ts': { '$gt': last_run, '$lt': cutoff } }
     icollection.map_reduce(
         map=mapf,
         reduce=reducef,
         finalize=finalizef,
         query=query,
         out={ 'reduce': ocollection.name, 'sharded': True })