Skip to content
Go back

MongoDB Aggregation Pipeline: Advanced Patterns and Optimization

MongoDB’s aggregation pipeline is one of its most powerful features, enabling complex data transformations and analysis. This guide covers advanced patterns, optimization techniques, and real-world use cases.

Understanding the Aggregation Pipeline

The aggregation pipeline processes documents through a sequence of stages, where each stage transforms the document stream.

Basic Pipeline Structure

db.collection.aggregate([
  { $match: { status: "active" } },      // Stage 1: Filter
  { $group: { _id: "$category", count: { $sum: 1 } } },  // Stage 2: Group
  { $sort: { count: -1 } }               // Stage 3: Sort
])

Each stage receives documents from the previous stage, transforms them, and passes results to the next stage.

Essential Pipeline Stages

$match: Filtering Documents

Always place $match as early as possible to reduce documents processed by subsequent stages:

// Good: Filter early
db.orders.aggregate([
  { $match: { status: "completed", total: { $gte: 100 } } },
  { $group: { _id: "$customerId", totalSpent: { $sum: "$total" } } }
])

// Bad: Filter late (processes unnecessary documents)
db.orders.aggregate([
  { $group: { _id: "$customerId", totalSpent: { $sum: "$total" } } },
  { $match: { totalSpent: { $gte: 1000 } } }
])

Performance tip: $match can use indexes, but only when placed before stages that modify document structure.

$project: Reshaping Documents

Control which fields pass through the pipeline:

db.users.aggregate([
  {
    $project: {
      _id: 0,                    // Exclude _id
      name: 1,                   // Include name
      email: 1,                  // Include email
      fullName: {                // Computed field
        $concat: ["$firstName", " ", "$lastName"]
      },
      ageInMonths: {             // Math operation
        $multiply: ["$age", 12]
      }
    }
  }
])

When to use: Reduce document size before expensive operations like $group or $lookup.

$group: Aggregating Data

Group documents by a key and perform calculations:

db.sales.aggregate([
  {
    $group: {
      _id: {                      // Compound group key
        year: { $year: "$date" },
        month: { $month: "$date" }
      },
      totalRevenue: { $sum: "$amount" },
      avgOrderValue: { $avg: "$amount" },
      orderCount: { $sum: 1 },
      maxOrder: { $max: "$amount" },
      minOrder: { $min: "$amount" },
      uniqueCustomers: { $addToSet: "$customerId" }
    }
  }
])

Common accumulators:

$lookup: Joining Collections

Perform left outer joins between collections:

db.orders.aggregate([
  {
    $lookup: {
      from: "customers",         // Collection to join
      localField: "customerId",  // Field in orders
      foreignField: "_id",       // Field in customers
      as: "customerInfo"         // Output array name
    }
  },
  {
    $unwind: "$customerInfo"     // Convert array to object
  }
])

Performance warning: $lookup is expensive. Use indexes on both localField and foreignField, and filter data before the lookup when possible.

$unwind: Deconstructing Arrays

Expand array fields into separate documents:

// Document before $unwind:
// { _id: 1, tags: ["mongodb", "database", "nosql"] }

db.articles.aggregate([
  { $unwind: "$tags" }
])

// After $unwind (3 documents):
// { _id: 1, tags: "mongodb" }
// { _id: 1, tags: "database" }
// { _id: 1, tags: "nosql" }

Useful for analyzing array elements:

db.articles.aggregate([
  { $unwind: "$tags" },
  { $group: { _id: "$tags", count: { $sum: 1 } } },
  { $sort: { count: -1 } }
])
// Result: Most common tags across all articles

Advanced Pipeline Patterns

Pattern 1: Multi-Stage Filtering

Combine index-friendly and complex filters:

db.products.aggregate([
  // Stage 1: Index-optimized filter
  { $match: { category: "electronics", price: { $gte: 100 } } },

  // Stage 2: Add computed fields
  { $addFields: { discountedPrice: { $multiply: ["$price", 0.9] } } },

  // Stage 3: Complex filter on computed field
  { $match: { discountedPrice: { $lte: 500 } } }
])

Pattern 2: Conditional Aggregation

Use $cond for conditional logic:

db.orders.aggregate([
  {
    $group: {
      _id: "$customerId",
      premiumOrders: {
        $sum: {
          $cond: [
            { $gte: ["$total", 1000] },  // Condition
            1,                            // Value if true
            0                             // Value if false
          ]
        }
      },
      standardOrders: {
        $sum: {
          $cond: [{ $lt: ["$total", 1000] }, 1, 0]
        }
      },
      totalRevenue: { $sum: "$total" }
    }
  }
])

Pattern 3: Bucketing Data

Group data into ranges using $bucket:

db.users.aggregate([
  {
    $bucket: {
      groupBy: "$age",
      boundaries: [0, 18, 30, 50, 65, 100],
      default: "Other",
      output: {
        count: { $sum: 1 },
        users: { $push: "$name" }
      }
    }
  }
])

// Result:
// { _id: 18, count: 245, users: [...] }  // Ages 18-29
// { _id: 30, count: 412, users: [...] }  // Ages 30-49
// { _id: 50, count: 189, users: [...] }  // Ages 50-64

Pattern 4: Time-Series Analysis

Analyze data over time windows:

db.metrics.aggregate([
  {
    $match: {
      timestamp: {
        $gte: ISODate("2024-01-01"),
        $lt: ISODate("2025-01-01")
      }
    }
  },
  {
    $group: {
      _id: {
        year: { $year: "$timestamp" },
        month: { $month: "$timestamp" },
        day: { $dayOfMonth: "$timestamp" }
      },
      avgValue: { $avg: "$value" },
      maxValue: { $max: "$value" },
      minValue: { $min: "$value" },
      dataPoints: { $sum: 1 }
    }
  },
  {
    $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 }
  }
])

Pattern 5: Top N per Group

Find top items within each group:

db.products.aggregate([
  {
    $sort: { category: 1, sales: -1 }  // Sort by category, then sales descending
  },
  {
    $group: {
      _id: "$category",
      topProducts: {
        $push: {
          name: "$name",
          sales: "$sales"
        }
      }
    }
  },
  {
    $project: {
      topProducts: { $slice: ["$topProducts", 5] }  // Keep only top 5
    }
  }
])

Optimization Techniques

1. Use Indexes Strategically

// Create compound index for aggregation
db.orders.createIndex({ status: 1, createdAt: 1 })

// This pipeline uses the index:
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $sort: { createdAt: -1 } },
  { $limit: 100 }
])

// Verify with explain():
db.orders.explain("executionStats").aggregate([...])

2. Limit Early and Often

db.articles.aggregate([
  { $match: { published: true } },
  { $sort: { views: -1 } },
  { $limit: 10 },                    // Limit before expensive operations
  {
    $lookup: {
      from: "authors",
      localField: "authorId",
      foreignField: "_id",
      as: "author"
    }
  }
])

3. Use $project to Reduce Document Size

db.logs.aggregate([
  {
    $project: {
      _id: 0,
      timestamp: 1,
      level: 1,
      message: 1
      // Exclude large fields like `stack_trace` or `metadata`
    }
  },
  {
    $group: {
      _id: { $hour: "$timestamp" },
      errorCount: {
        $sum: { $cond: [{ $eq: ["$level", "error"] }, 1, 0] }
      }
    }
  }
])

4. allowDiskUse for Large Datasets

db.hugecollection.aggregate(
  [...],
  { allowDiskUse: true }  // Enable disk usage for sorts exceeding 100MB memory
)

Warning: Disk operations are much slower. Better to optimize the pipeline to avoid disk usage.

5. Pipeline Order Matters

Optimal stage ordering:

  1. $match (with indexes)
  2. $sort (with indexes)
  3. $limit
  4. $project (reduce document size)
  5. $unwind
  6. $group
  7. $lookup
  8. $match (on computed fields)
  9. $sort (on computed fields)

Real-World Use Cases

E-Commerce: Customer Lifetime Value

db.orders.aggregate([
  {
    $match: {
      status: "completed",
      createdAt: { $gte: ISODate("2024-01-01") }
    }
  },
  {
    $group: {
      _id: "$customerId",
      totalSpent: { $sum: "$total" },
      orderCount: { $sum: 1 },
      avgOrderValue: { $avg: "$total" },
      firstOrder: { $min: "$createdAt" },
      lastOrder: { $max: "$createdAt" }
    }
  },
  {
    $addFields: {
      customerLifetimeDays: {
        $divide: [
          { $subtract: ["$lastOrder", "$firstOrder"] },
          1000 * 60 * 60 * 24
        ]
      }
    }
  },
  {
    $match: { totalSpent: { $gte: 1000 } }
  },
  {
    $sort: { totalSpent: -1 }
  }
])

Analytics: Cohort Analysis

db.users.aggregate([
  {
    $addFields: {
      cohortMonth: {
        $dateToString: {
          format: "%Y-%m",
          date: "$registeredAt"
        }
      }
    }
  },
  {
    $lookup: {
      from: "activities",
      let: { userId: "$_id" },
      pipeline: [
        { $match: { $expr: { $eq: ["$userId", "$$userId"] } } },
        {
          $group: {
            _id: {
              $dateToString: {
                format: "%Y-%m",
                date: "$timestamp"
              }
            },
            activityCount: { $sum: 1 }
          }
        }
      ],
      as: "monthlyActivity"
    }
  },
  {
    $group: {
      _id: "$cohortMonth",
      totalUsers: { $sum: 1 },
      activeUsers: {
        $sum: {
          $cond: [{ $gt: [{ $size: "$monthlyActivity" }, 0] }, 1, 0]
        }
      }
    }
  },
  {
    $project: {
      totalUsers: 1,
      activeUsers: 1,
      retentionRate: {
        $multiply: [
          { $divide: ["$activeUsers", "$totalUsers"] },
          100
        ]
      }
    }
  }
])

Common Pitfalls

1. Unbounded $lookup

// Bad: Joins entire collections
db.orders.aggregate([
  {
    $lookup: {
      from: "products",  // Large collection
      localField: "productId",
      foreignField: "_id",
      as: "product"
    }
  }
])

// Better: Filter in lookup pipeline
db.orders.aggregate([
  {
    $lookup: {
      from: "products",
      let: { productId: "$productId" },
      pipeline: [
        { $match: { $expr: { $eq: ["$_id", "$$productId"] }, inStock: true } },
        { $project: { name: 1, price: 1 } }
      ],
      as: "product"
    }
  }
])

2. Missing Indexes on $match

Always create indexes for fields in $match:

// Create index first
db.events.createIndex({ type: 1, timestamp: -1 })

// Then aggregate
db.events.aggregate([
  { $match: { type: "click", timestamp: { $gte: recentDate } } }
])

3. Memory Limits

Aggregation stages have a 100MB memory limit by default:

// This might fail on large datasets
db.huge.aggregate([
  { $group: { _id: "$category", items: { $push: "$$ROOT" } } }
])

// Solution: Use allowDiskUse or redesign pipeline
db.huge.aggregate(
  [{ $group: { _id: "$category", count: { $sum: 1 } } }],
  { allowDiskUse: true }
)

Conclusion

MongoDB’s aggregation pipeline is incredibly powerful for data analysis and transformation. Key takeaways:

Start with simple pipelines, measure performance, and add complexity incrementally. The aggregation framework scales beautifully when used correctly, handling billions of documents efficiently.

Remember: The best pipeline is the one that returns the right data with the fewest stages.


Share this post on:

Previous Post
Understanding Database Indexes: B-Trees vs Hash Indexes
Next Post
Zero-Downtime Database Migrations: Strategies and Patterns