- Published on
MongoDB Aggregation Pipeline: Advanced Data Processing
- Authors
- Name
- Mamun Rashid
- @mmncit
MongoDB Aggregation Pipeline: Advanced Data Processing
Welcome to Part 6 of our MongoDB Zero to Hero series. After mastering indexing and performance, it's time to explore MongoDB's most powerful feature for data processing: the Aggregation Pipeline.
What is the Aggregation Pipeline?
The Aggregation Pipeline is a framework for data aggregation modeled on the concept of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into aggregated results.
Think of it like an assembly line where each stage performs a specific operation on the data before passing it to the next stage.
// Basic pipeline structure
db.collection.aggregate([
{ stage1 },
{ stage2 },
{ stage3 },
// ... more stages
]);
Sample Data Setup
Let's create sample data for our examples:
// Create sample sales data
db.sales.insertMany([
{
_id: ObjectId('sales1'),
product: 'Laptop',
category: 'Electronics',
price: 1200,
quantity: 2,
saleDate: ISODate('2024-01-15'),
customer: {
name: 'Alice Johnson',
email: 'alice@example.com',
region: 'North',
},
salesperson: 'John Doe',
},
{
_id: ObjectId('sales2'),
product: 'Mouse',
category: 'Electronics',
price: 25,
quantity: 5,
saleDate: ISODate('2024-01-16'),
customer: {
name: 'Bob Smith',
email: 'bob@example.com',
region: 'South',
},
salesperson: 'Jane Smith',
},
{
_id: ObjectId('sales3'),
product: 'Desk Chair',
category: 'Furniture',
price: 300,
quantity: 1,
saleDate: ISODate('2024-01-17'),
customer: {
name: 'Carol Davis',
email: 'carol@example.com',
region: 'North',
},
salesperson: 'John Doe',
},
{
_id: ObjectId('sales4'),
product: 'Monitor',
category: 'Electronics',
price: 400,
quantity: 3,
saleDate: ISODate('2024-01-18'),
customer: {
name: 'David Wilson',
email: 'david@example.com',
region: 'West',
},
salesperson: 'Jane Smith',
},
{
_id: ObjectId('sales5'),
product: 'Keyboard',
category: 'Electronics',
price: 75,
quantity: 4,
saleDate: ISODate('2024-01-19'),
customer: {
name: 'Emma Brown',
email: 'emma@example.com',
region: 'East',
},
salesperson: 'John Doe',
},
]);
Core Aggregation Stages
1. $match - Filtering Documents
Filters documents similar to find()
:
// Find electronics sales
db.sales.aggregate([{ $match: { category: 'Electronics' } }]);
// Multiple conditions
db.sales.aggregate([
{
$match: {
category: 'Electronics',
price: { $gte: 50 },
},
},
]);
// Complex matching with date ranges
db.sales.aggregate([
{
$match: {
saleDate: {
$gte: ISODate('2024-01-16'),
$lt: ISODate('2024-01-19'),
},
},
},
]);
// Match with regex
db.sales.aggregate([
{ $match: { product: /^L/ } }, // Products starting with 'L'
]);
2. $project - Shaping Output
Select, add, or transform fields:
// Select specific fields
db.sales.aggregate([
{
$project: {
product: 1,
price: 1,
quantity: 1,
_id: 0, // Exclude _id
},
},
]);
// Add calculated fields
db.sales.aggregate([
{
$project: {
product: 1,
price: 1,
quantity: 1,
total: { $multiply: ['$price', '$quantity'] }, // Calculate total
customerName: '$customer.name', // Extract nested field
},
},
]);
// Complex transformations
db.sales.aggregate([
{
$project: {
product: 1,
priceRange: {
$cond: [
{ $lt: ['$price', 100] },
'Low',
{
$cond: [{ $lt: ['$price', 500] }, 'Medium', 'High'],
},
],
},
saleMonth: { $month: '$saleDate' },
saleYear: { $year: '$saleDate' },
},
},
]);
3. $group - Grouping and Aggregating
Group documents and perform calculations:
// Group by category and count
db.sales.aggregate([
{
$group: {
_id: '$category',
count: { $sum: 1 },
totalRevenue: { $sum: { $multiply: ['$price', '$quantity'] } },
},
},
]);
// Group by multiple fields
db.sales.aggregate([
{
$group: {
_id: {
category: '$category',
region: '$customer.region',
},
averagePrice: { $avg: '$price' },
maxPrice: { $max: '$price' },
minPrice: { $min: '$price' },
},
},
]);
// Group by date parts
db.sales.aggregate([
{
$group: {
_id: {
year: { $year: '$saleDate' },
month: { $month: '$saleDate' },
},
monthlySales: { $sum: { $multiply: ['$price', '$quantity'] } },
avgOrderValue: { $avg: { $multiply: ['$price', '$quantity'] } },
},
},
]);
// Collect values into arrays
db.sales.aggregate([
{
$group: {
_id: '$salesperson',
products: { $push: '$product' }, // Array of all products
uniqueProducts: { $addToSet: '$product' }, // Array of unique products
totalSales: { $sum: { $multiply: ['$price', '$quantity'] } },
},
},
]);
4. $sort - Sorting Results
Sort documents by one or more fields:
// Sort by single field
db.sales.aggregate([
{ $match: { category: 'Electronics' } },
{ $sort: { price: -1 } }, // Descending order
]);
// Sort by multiple fields
db.sales.aggregate([
{
$group: {
_id: '$category',
totalRevenue: { $sum: { $multiply: ['$price', '$quantity'] } },
},
},
{ $sort: { totalRevenue: -1, _id: 1 } },
]);
// Sort with computed fields
db.sales.aggregate([
{
$project: {
product: 1,
total: { $multiply: ['$price', '$quantity'] },
},
},
{ $sort: { total: -1 } },
]);
5. $limit and $skip - Pagination
Control the number of results:
// Get top 3 sales by value
db.sales.aggregate([
{
$project: {
product: 1,
customer: '$customer.name',
total: { $multiply: ['$price', '$quantity'] },
},
},
{ $sort: { total: -1 } },
{ $limit: 3 },
]);
// Pagination: Skip first 2, take next 3
db.sales.aggregate([{ $sort: { saleDate: -1 } }, { $skip: 2 }, { $limit: 3 }]);
6. $unwind - Deconstructing Arrays
Convert array elements to separate documents:
// Sample document with array
db.orders.insertOne({
_id: ObjectId('order1'),
customer: 'John Doe',
items: [
{ product: 'Laptop', price: 1200, qty: 1 },
{ product: 'Mouse', price: 25, qty: 2 },
{ product: 'Keyboard', price: 75, qty: 1 },
],
orderDate: ISODate('2024-01-15'),
});
// Unwind items array
db.orders.aggregate([{ $unwind: '$items' }]);
// Result: Creates separate document for each item
/*
{
_id: ObjectId("order1"),
customer: "John Doe",
items: { product: "Laptop", price: 1200, qty: 1 },
orderDate: ISODate("2024-01-15")
}
{
_id: ObjectId("order1"),
customer: "John Doe",
items: { product: "Mouse", price: 25, qty: 2 },
orderDate: ISODate("2024-01-15")
}
// ... etc
*/
// Unwind with preserveNullAndEmptyArrays
db.orders.aggregate([{ $unwind: { path: '$items', preserveNullAndEmptyArrays: true } }]);
7. $lookup - Joins
Perform left outer joins with other collections:
// Create products collection
db.products.insertMany([
{ _id: 'laptop', name: 'Gaming Laptop', category: 'Electronics', brand: 'TechBrand' },
{ _id: 'mouse', name: 'Wireless Mouse', category: 'Electronics', brand: 'TechBrand' },
{ _id: 'chair', name: 'Office Chair', category: 'Furniture', brand: 'ComfortCo' },
]);
// Join sales with products
db.sales.aggregate([
{
$lookup: {
from: 'products', // Collection to join
localField: 'product', // Field in sales collection
foreignField: 'name', // Field in products collection
as: 'productDetails', // Output array field
},
},
]);
// Lookup with pipeline (more complex joins)
db.sales.aggregate([
{
$lookup: {
from: 'products',
let: { productName: '$product' },
pipeline: [
{ $match: { $expr: { $eq: ['$name', '$$productName'] } } },
{ $project: { name: 1, brand: 1, _id: 0 } },
],
as: 'productInfo',
},
},
]);
// Unwind lookup results (convert array to object)
db.sales.aggregate([
{
$lookup: {
from: 'products',
localField: 'product',
foreignField: 'name',
as: 'productDetails',
},
},
{ $unwind: '$productDetails' },
{
$project: {
product: 1,
brand: '$productDetails.brand',
price: 1,
quantity: 1,
},
},
]);
8. $addFields - Adding Fields
Add new fields to existing documents:
// Add calculated fields
db.sales.aggregate([
{
$addFields: {
total: { $multiply: ['$price', '$quantity'] },
saleMonth: { $month: '$saleDate' },
customerInitials: {
$concat: [
{ $substr: ['$customer.name', 0, 1] },
'.',
{ $substr: [{ $arrayElemAt: [{ $split: ['$customer.name', ' '] }, 1] }, 0, 1] },
'.',
],
},
},
},
]);
// Add conditional fields
db.sales.aggregate([
{
$addFields: {
salesCategory: {
$switch: {
branches: [
{ case: { $lt: ['$price', 50] }, then: 'Budget' },
{ case: { $lt: ['$price', 200] }, then: 'Mid-range' },
{ case: { $gte: ['$price', 200] }, then: 'Premium' },
],
default: 'Unknown',
},
},
},
},
]);
9. $replaceRoot - Promoting Nested Documents
Replace the root document with a nested document:
// Sample nested data
db.users.insertOne({
_id: ObjectId('user1'),
username: 'alice123',
profile: {
name: 'Alice Johnson',
email: 'alice@example.com',
age: 28,
address: {
city: 'New York',
country: 'USA',
},
},
lastLogin: ISODate('2024-01-15'),
});
// Promote profile to root level
db.users.aggregate([{ $replaceRoot: { newRoot: '$profile' } }]);
// Promote with additional fields
db.users.aggregate([
{
$replaceRoot: {
newRoot: {
$mergeObjects: ['$profile', { username: '$username', lastLogin: '$lastLogin' }],
},
},
},
]);
10. $facet - Multiple Pipelines
Execute multiple aggregation pipelines on the same data:
// Multiple analyses in one query
db.sales.aggregate([
{
$facet: {
categorySummary: [
{ $group: { _id: '$category', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
],
regionSummary: [
{
$group: {
_id: '$customer.region',
totalRevenue: { $sum: { $multiply: ['$price', '$quantity'] } },
},
},
{ $sort: { totalRevenue: -1 } },
],
priceRanges: [
{
$bucket: {
groupBy: '$price',
boundaries: [0, 50, 200, 500, 2000],
default: 'Other',
output: { count: { $sum: 1 } },
},
},
],
},
},
]);
Advanced Aggregation Operators
Array Operators
// Sample document with arrays
db.surveys.insertOne({
_id: ObjectId('survey1'),
respondent: 'John Doe',
scores: [85, 92, 78, 90, 88],
categories: ['tech', 'design', 'business'],
responses: [
{ question: 'Q1', score: 85 },
{ question: 'Q2', score: 92 },
{ question: 'Q3', score: 78 },
],
});
// Array operations
db.surveys.aggregate([
{
$project: {
respondent: 1,
totalScores: { $sum: '$scores' },
avgScore: { $avg: '$scores' },
maxScore: { $max: '$scores' },
minScore: { $min: '$scores' },
scoreCount: { $size: '$scores' },
firstScore: { $arrayElemAt: ['$scores', 0] },
lastScore: { $arrayElemAt: ['$scores', -1] },
highScores: {
$filter: {
input: '$scores',
cond: { $gte: ['$$this', 90] },
},
},
categoryList: {
$reduce: {
input: '$categories',
initialValue: '',
in: {
$concat: [
'$$value',
{ $cond: [{ $eq: ['$$value', ''] }, '', ', '] },
'$$this',
],
},
},
},
},
},
]);
Date Operators
// Date manipulations
db.sales.aggregate([
{
$project: {
product: 1,
saleDate: 1,
year: { $year: '$saleDate' },
month: { $month: '$saleDate' },
dayOfWeek: { $dayOfWeek: '$saleDate' },
dayOfYear: { $dayOfYear: '$saleDate' },
weekOfYear: { $week: '$saleDate' },
quarter: {
$ceil: { $divide: [{ $month: '$saleDate' }, 3] },
},
formattedDate: {
$dateToString: {
format: '%Y-%m-%d',
date: '$saleDate',
},
},
// Date arithmetic
thirtyDaysAgo: {
$dateSubtract: {
startDate: '$saleDate',
unit: 'day',
amount: 30,
},
},
},
},
]);
String Operators
// String manipulations
db.sales.aggregate([
{
$project: {
product: 1,
productUpper: { $toUpper: '$product' },
productLower: { $toLower: '$product' },
productLength: { $strLenCP: '$product' },
productSubstring: { $substr: ['$product', 0, 3] },
customerFirstName: {
$arrayElemAt: [{ $split: ['$customer.name', ' '] }, 0],
},
customerLastName: {
$arrayElemAt: [{ $split: ['$customer.name', ' '] }, -1],
},
productCode: {
$concat: [{ $substr: ['$category', 0, 2] }, '-', { $substr: ['$product', 0, 3] }],
},
},
},
]);
Complex Aggregation Examples
1. Sales Analytics Dashboard
// Comprehensive sales analysis
db.sales.aggregate([
// Add calculated fields
{
$addFields: {
total: { $multiply: ['$price', '$quantity'] },
saleMonth: { $month: '$saleDate' },
saleYear: { $year: '$saleDate' },
},
},
// Multiple facets for dashboard
{
$facet: {
// Monthly sales trend
monthlySales: [
{
$group: {
_id: { year: '$saleYear', month: '$saleMonth' },
revenue: { $sum: '$total' },
orders: { $sum: 1 },
avgOrderValue: { $avg: '$total' },
},
},
{ $sort: { '_id.year': 1, '_id.month': 1 } },
],
// Top salespeople
topSalespeople: [
{
$group: {
_id: '$salesperson',
totalRevenue: { $sum: '$total' },
totalOrders: { $sum: 1 },
avgOrderValue: { $avg: '$total' },
},
},
{ $sort: { totalRevenue: -1 } },
{ $limit: 5 },
],
// Category performance
categoryPerformance: [
{
$group: {
_id: '$category',
revenue: { $sum: '$total' },
units: { $sum: '$quantity' },
avgPrice: { $avg: '$price' },
},
},
{ $sort: { revenue: -1 } },
],
// Regional analysis
regionalAnalysis: [
{
$group: {
_id: '$customer.region',
revenue: { $sum: '$total' },
customers: { $addToSet: '$customer.email' },
},
},
{
$project: {
region: '$_id',
revenue: 1,
customerCount: { $size: '$customers' },
avgRevenuePerCustomer: { $divide: ['$revenue', { $size: '$customers' }] },
},
},
{ $sort: { revenue: -1 } },
],
},
},
]);
2. Customer Segmentation
// RFM Analysis (Recency, Frequency, Monetary)
db.sales.aggregate([
// Add total and days since last purchase
{
$addFields: {
total: { $multiply: ['$price', '$quantity'] },
daysSinceLastPurchase: {
$divide: [
{ $subtract: [new Date(), '$saleDate'] },
86400000, // milliseconds in a day
],
},
},
},
// Group by customer
{
$group: {
_id: '$customer.email',
customerName: { $first: '$customer.name' },
region: { $first: '$customer.region' },
recency: { $min: '$daysSinceLastPurchase' }, // Days since last purchase
frequency: { $sum: 1 }, // Number of purchases
monetary: { $sum: '$total' }, // Total spent
firstPurchase: { $min: '$saleDate' },
lastPurchase: { $max: '$saleDate' },
},
},
// Create customer segments
{
$addFields: {
recencyScore: {
$cond: [
{ $lte: ['$recency', 30] },
5,
{
$cond: [
{ $lte: ['$recency', 60] },
4,
{
$cond: [
{ $lte: ['$recency', 90] },
3,
{ $cond: [{ $lte: ['$recency', 180] }, 2, 1] },
],
},
],
},
],
},
frequencyScore: {
$cond: [
{ $gte: ['$frequency', 5] },
5,
{
$cond: [
{ $gte: ['$frequency', 3] },
4,
{ $cond: [{ $gte: ['$frequency', 2] }, 3, 2] },
],
},
],
},
monetaryScore: {
$cond: [
{ $gte: ['$monetary', 1000] },
5,
{
$cond: [
{ $gte: ['$monetary', 500] },
4,
{
$cond: [
{ $gte: ['$monetary', 200] },
3,
{ $cond: [{ $gte: ['$monetary', 50] }, 2, 1] },
],
},
],
},
],
},
},
},
// Determine customer segment
{
$addFields: {
rfmScore: { $add: ['$recencyScore', '$frequencyScore', '$monetaryScore'] },
segment: {
$switch: {
branches: [
{
case: {
$and: [
{ $gte: ['$recencyScore', 4] },
{ $gte: ['$frequencyScore', 4] },
{ $gte: ['$monetaryScore', 4] },
],
},
then: 'Champions',
},
{
case: {
$and: [
{ $gte: ['$recencyScore', 3] },
{ $gte: ['$frequencyScore', 3] },
{ $gte: ['$monetaryScore', 3] },
],
},
then: 'Loyal Customers',
},
{
case: {
$and: [
{ $gte: ['$recencyScore', 4] },
{ $lte: ['$frequencyScore', 2] },
],
},
then: 'New Customers',
},
{
case: {
$and: [
{ $lte: ['$recencyScore', 2] },
{ $gte: ['$monetaryScore', 3] },
],
},
then: 'At Risk',
},
],
default: 'Others',
},
},
},
},
{ $sort: { rfmScore: -1 } },
]);
3. Cohort Analysis
// Customer cohort analysis
db.sales.aggregate([
// Add helper fields
{
$addFields: {
total: { $multiply: ['$price', '$quantity'] },
orderMonth: {
$dateFromParts: {
year: { $year: '$saleDate' },
month: { $month: '$saleDate' },
day: 1,
},
},
},
},
// Get first purchase date for each customer
{
$group: {
_id: '$customer.email',
firstPurchaseMonth: { $min: '$orderMonth' },
orders: {
$push: {
orderMonth: '$orderMonth',
total: '$total',
},
},
},
},
// Unwind orders to process each separately
{ $unwind: '$orders' },
// Calculate months since first purchase
{
$addFields: {
monthsSinceFirst: {
$divide: [
{ $subtract: ['$orders.orderMonth', '$firstPurchaseMonth'] },
2592000000, // milliseconds in 30 days
],
},
},
},
// Group by cohort and period
{
$group: {
_id: {
cohortMonth: '$firstPurchaseMonth',
period: { $floor: '$monthsSinceFirst' },
},
customers: { $addToSet: '$_id' },
revenue: { $sum: '$orders.total' },
},
},
// Calculate cohort sizes
{
$group: {
_id: '$_id.cohortMonth',
periods: {
$push: {
period: '$_id.period',
customers: { $size: '$customers' },
revenue: '$revenue',
},
},
},
},
{ $sort: { _id: 1 } },
]);
Performance Optimization
1. Pipeline Optimization
// ✅ Good: Filter early with $match
db.sales.aggregate([
{ $match: { saleDate: { $gte: ISODate('2024-01-01') } } }, // Filter first
{ $group: { _id: '$category', total: { $sum: '$price' } } },
{ $sort: { total: -1 } },
]);
// ❌ Bad: Filter after expensive operations
db.sales.aggregate([
{ $group: { _id: '$category', total: { $sum: '$price' } } },
{ $sort: { total: -1 } },
{ $match: { total: { $gte: 1000 } } }, // Should be earlier if possible
]);
2. Index Usage
// Create indexes to support aggregation stages
db.sales.createIndex({ saleDate: 1 }); // For $match on dates
db.sales.createIndex({ category: 1, price: 1 }); // For $group and $sort
db.sales.createIndex({ 'customer.region': 1 }); // For nested field queries
// Use $match stages that can use indexes
db.sales.aggregate([
{ $match: { category: 'Electronics' } }, // Uses index
// ... rest of pipeline
]);
3. Memory Usage
// For large datasets, consider using allowDiskUse
db.sales.aggregate(
[
// ... complex pipeline
],
{ allowDiskUse: true },
);
// Monitor memory usage
db.adminCommand({ getParameter: 1, internalQueryExecMaxBlockingSortBytes: 1 });
Common Aggregation Patterns
1. Top N per Group
// Top 2 products per category by revenue
db.sales.aggregate([
{
$addFields: {
revenue: { $multiply: ['$price', '$quantity'] },
},
},
{ $sort: { category: 1, revenue: -1 } },
{
$group: {
_id: '$category',
topProducts: { $push: '$$ROOT' },
},
},
{
$project: {
category: '$_id',
topProducts: { $slice: ['$topProducts', 2] },
},
},
]);
2. Moving Averages
// 3-day moving average of sales
db.dailySales.aggregate([
{ $sort: { date: 1 } },
{
$setWindowFields: {
sortBy: { date: 1 },
output: {
movingAvg: {
$avg: '$sales',
window: {
documents: [-2, 0], // Current + 2 previous days
},
},
},
},
},
]);
3. Pivot Tables
// Sales by category and region (pivot table)
db.sales.aggregate([
{
$addFields: {
revenue: { $multiply: ['$price', '$quantity'] },
},
},
{
$group: {
_id: {
category: '$category',
region: '$customer.region',
},
revenue: { $sum: '$revenue' },
},
},
{
$group: {
_id: '$_id.category',
regions: {
$push: {
region: '$_id.region',
revenue: '$revenue',
},
},
totalRevenue: { $sum: '$revenue' },
},
},
]);
What's Next?
Now that you've mastered the Aggregation Pipeline, you're ready to integrate MongoDB with applications using Node.js Integration, or learn about Production Deployment.
Series Navigation
- Previous: MongoDB Indexing and Performance
- Next: MongoDB with Node.js Integration
- Hub: MongoDB Zero to Hero - Complete Guide
This is Part 6 of the MongoDB Zero to Hero series. The Aggregation Pipeline is one of MongoDB's most powerful features - master it to unlock advanced data processing capabilities.
Enjoyed this post?
Subscribe to get notified about new posts and updates. No spam, unsubscribe anytime.
By subscribing, you agree to our Privacy Policy. You can unsubscribe at any time.
Discussion (0)
This website is still under development. If you encounter any issues, please contact me