Published on

MongoDB Aggregation Pipeline: Advanced Data Processing

Authors

MongoDB Aggregation Pipeline: Advanced Data Processing

Welcome to Part 6 of our MongoDB Zero to Hero series. After mastering indexing and performance, it's time to explore MongoDB's most powerful feature for data processing: the Aggregation Pipeline.

What is the Aggregation Pipeline?

The Aggregation Pipeline is a framework for data aggregation modeled on the concept of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into aggregated results.

Think of it like an assembly line where each stage performs a specific operation on the data before passing it to the next stage.

// Basic pipeline structure
db.collection.aggregate([
    { stage1 },
    { stage2 },
    { stage3 },
    // ... more stages
]);

Sample Data Setup

Let's create sample data for our examples:

// Create sample sales data
db.sales.insertMany([
    {
        _id: ObjectId('sales1'),
        product: 'Laptop',
        category: 'Electronics',
        price: 1200,
        quantity: 2,
        saleDate: ISODate('2024-01-15'),
        customer: {
            name: 'Alice Johnson',
            email: 'alice@example.com',
            region: 'North',
        },
        salesperson: 'John Doe',
    },
    {
        _id: ObjectId('sales2'),
        product: 'Mouse',
        category: 'Electronics',
        price: 25,
        quantity: 5,
        saleDate: ISODate('2024-01-16'),
        customer: {
            name: 'Bob Smith',
            email: 'bob@example.com',
            region: 'South',
        },
        salesperson: 'Jane Smith',
    },
    {
        _id: ObjectId('sales3'),
        product: 'Desk Chair',
        category: 'Furniture',
        price: 300,
        quantity: 1,
        saleDate: ISODate('2024-01-17'),
        customer: {
            name: 'Carol Davis',
            email: 'carol@example.com',
            region: 'North',
        },
        salesperson: 'John Doe',
    },
    {
        _id: ObjectId('sales4'),
        product: 'Monitor',
        category: 'Electronics',
        price: 400,
        quantity: 3,
        saleDate: ISODate('2024-01-18'),
        customer: {
            name: 'David Wilson',
            email: 'david@example.com',
            region: 'West',
        },
        salesperson: 'Jane Smith',
    },
    {
        _id: ObjectId('sales5'),
        product: 'Keyboard',
        category: 'Electronics',
        price: 75,
        quantity: 4,
        saleDate: ISODate('2024-01-19'),
        customer: {
            name: 'Emma Brown',
            email: 'emma@example.com',
            region: 'East',
        },
        salesperson: 'John Doe',
    },
]);

Core Aggregation Stages

1. $match - Filtering Documents

Filters documents similar to find():

// Find electronics sales
db.sales.aggregate([{ $match: { category: 'Electronics' } }]);

// Multiple conditions
db.sales.aggregate([
    {
        $match: {
            category: 'Electronics',
            price: { $gte: 50 },
        },
    },
]);

// Complex matching with date ranges
db.sales.aggregate([
    {
        $match: {
            saleDate: {
                $gte: ISODate('2024-01-16'),
                $lt: ISODate('2024-01-19'),
            },
        },
    },
]);

// Match with regex
db.sales.aggregate([
    { $match: { product: /^L/ } }, // Products starting with 'L'
]);

2. $project - Shaping Output

Select, add, or transform fields:

// Select specific fields
db.sales.aggregate([
    {
        $project: {
            product: 1,
            price: 1,
            quantity: 1,
            _id: 0, // Exclude _id
        },
    },
]);

// Add calculated fields
db.sales.aggregate([
    {
        $project: {
            product: 1,
            price: 1,
            quantity: 1,
            total: { $multiply: ['$price', '$quantity'] }, // Calculate total
            customerName: '$customer.name', // Extract nested field
        },
    },
]);

// Complex transformations
db.sales.aggregate([
    {
        $project: {
            product: 1,
            priceRange: {
                $cond: [
                    { $lt: ['$price', 100] },
                    'Low',
                    {
                        $cond: [{ $lt: ['$price', 500] }, 'Medium', 'High'],
                    },
                ],
            },
            saleMonth: { $month: '$saleDate' },
            saleYear: { $year: '$saleDate' },
        },
    },
]);

3. $group - Grouping and Aggregating

Group documents and perform calculations:

// Group by category and count
db.sales.aggregate([
    {
        $group: {
            _id: '$category',
            count: { $sum: 1 },
            totalRevenue: { $sum: { $multiply: ['$price', '$quantity'] } },
        },
    },
]);

// Group by multiple fields
db.sales.aggregate([
    {
        $group: {
            _id: {
                category: '$category',
                region: '$customer.region',
            },
            averagePrice: { $avg: '$price' },
            maxPrice: { $max: '$price' },
            minPrice: { $min: '$price' },
        },
    },
]);

// Group by date parts
db.sales.aggregate([
    {
        $group: {
            _id: {
                year: { $year: '$saleDate' },
                month: { $month: '$saleDate' },
            },
            monthlySales: { $sum: { $multiply: ['$price', '$quantity'] } },
            avgOrderValue: { $avg: { $multiply: ['$price', '$quantity'] } },
        },
    },
]);

// Collect values into arrays
db.sales.aggregate([
    {
        $group: {
            _id: '$salesperson',
            products: { $push: '$product' }, // Array of all products
            uniqueProducts: { $addToSet: '$product' }, // Array of unique products
            totalSales: { $sum: { $multiply: ['$price', '$quantity'] } },
        },
    },
]);

4. $sort - Sorting Results

Sort documents by one or more fields:

// Sort by single field
db.sales.aggregate([
    { $match: { category: 'Electronics' } },
    { $sort: { price: -1 } }, // Descending order
]);

// Sort by multiple fields
db.sales.aggregate([
    {
        $group: {
            _id: '$category',
            totalRevenue: { $sum: { $multiply: ['$price', '$quantity'] } },
        },
    },
    { $sort: { totalRevenue: -1, _id: 1 } },
]);

// Sort with computed fields
db.sales.aggregate([
    {
        $project: {
            product: 1,
            total: { $multiply: ['$price', '$quantity'] },
        },
    },
    { $sort: { total: -1 } },
]);

5. $limit and $skip - Pagination

Control the number of results:

// Get top 3 sales by value
db.sales.aggregate([
    {
        $project: {
            product: 1,
            customer: '$customer.name',
            total: { $multiply: ['$price', '$quantity'] },
        },
    },
    { $sort: { total: -1 } },
    { $limit: 3 },
]);

// Pagination: Skip first 2, take next 3
db.sales.aggregate([{ $sort: { saleDate: -1 } }, { $skip: 2 }, { $limit: 3 }]);

6. $unwind - Deconstructing Arrays

Convert array elements to separate documents:

// Sample document with array
db.orders.insertOne({
    _id: ObjectId('order1'),
    customer: 'John Doe',
    items: [
        { product: 'Laptop', price: 1200, qty: 1 },
        { product: 'Mouse', price: 25, qty: 2 },
        { product: 'Keyboard', price: 75, qty: 1 },
    ],
    orderDate: ISODate('2024-01-15'),
});

// Unwind items array
db.orders.aggregate([{ $unwind: '$items' }]);

// Result: Creates separate document for each item
/*
{
  _id: ObjectId("order1"),
  customer: "John Doe", 
  items: { product: "Laptop", price: 1200, qty: 1 },
  orderDate: ISODate("2024-01-15")
}
{
  _id: ObjectId("order1"),
  customer: "John Doe",
  items: { product: "Mouse", price: 25, qty: 2 },
  orderDate: ISODate("2024-01-15")
}
// ... etc
*/

// Unwind with preserveNullAndEmptyArrays
db.orders.aggregate([{ $unwind: { path: '$items', preserveNullAndEmptyArrays: true } }]);

7. $lookup - Joins

Perform left outer joins with other collections:

// Create products collection
db.products.insertMany([
    { _id: 'laptop', name: 'Gaming Laptop', category: 'Electronics', brand: 'TechBrand' },
    { _id: 'mouse', name: 'Wireless Mouse', category: 'Electronics', brand: 'TechBrand' },
    { _id: 'chair', name: 'Office Chair', category: 'Furniture', brand: 'ComfortCo' },
]);

// Join sales with products
db.sales.aggregate([
    {
        $lookup: {
            from: 'products', // Collection to join
            localField: 'product', // Field in sales collection
            foreignField: 'name', // Field in products collection
            as: 'productDetails', // Output array field
        },
    },
]);

// Lookup with pipeline (more complex joins)
db.sales.aggregate([
    {
        $lookup: {
            from: 'products',
            let: { productName: '$product' },
            pipeline: [
                { $match: { $expr: { $eq: ['$name', '$$productName'] } } },
                { $project: { name: 1, brand: 1, _id: 0 } },
            ],
            as: 'productInfo',
        },
    },
]);

// Unwind lookup results (convert array to object)
db.sales.aggregate([
    {
        $lookup: {
            from: 'products',
            localField: 'product',
            foreignField: 'name',
            as: 'productDetails',
        },
    },
    { $unwind: '$productDetails' },
    {
        $project: {
            product: 1,
            brand: '$productDetails.brand',
            price: 1,
            quantity: 1,
        },
    },
]);

8. $addFields - Adding Fields

Add new fields to existing documents:

// Add calculated fields
db.sales.aggregate([
    {
        $addFields: {
            total: { $multiply: ['$price', '$quantity'] },
            saleMonth: { $month: '$saleDate' },
            customerInitials: {
                $concat: [
                    { $substr: ['$customer.name', 0, 1] },
                    '.',
                    { $substr: [{ $arrayElemAt: [{ $split: ['$customer.name', ' '] }, 1] }, 0, 1] },
                    '.',
                ],
            },
        },
    },
]);

// Add conditional fields
db.sales.aggregate([
    {
        $addFields: {
            salesCategory: {
                $switch: {
                    branches: [
                        { case: { $lt: ['$price', 50] }, then: 'Budget' },
                        { case: { $lt: ['$price', 200] }, then: 'Mid-range' },
                        { case: { $gte: ['$price', 200] }, then: 'Premium' },
                    ],
                    default: 'Unknown',
                },
            },
        },
    },
]);

9. $replaceRoot - Promoting Nested Documents

Replace the root document with a nested document:

// Sample nested data
db.users.insertOne({
    _id: ObjectId('user1'),
    username: 'alice123',
    profile: {
        name: 'Alice Johnson',
        email: 'alice@example.com',
        age: 28,
        address: {
            city: 'New York',
            country: 'USA',
        },
    },
    lastLogin: ISODate('2024-01-15'),
});

// Promote profile to root level
db.users.aggregate([{ $replaceRoot: { newRoot: '$profile' } }]);

// Promote with additional fields
db.users.aggregate([
    {
        $replaceRoot: {
            newRoot: {
                $mergeObjects: ['$profile', { username: '$username', lastLogin: '$lastLogin' }],
            },
        },
    },
]);

10. $facet - Multiple Pipelines

Execute multiple aggregation pipelines on the same data:

// Multiple analyses in one query
db.sales.aggregate([
    {
        $facet: {
            categorySummary: [
                { $group: { _id: '$category', count: { $sum: 1 } } },
                { $sort: { count: -1 } },
            ],
            regionSummary: [
                {
                    $group: {
                        _id: '$customer.region',
                        totalRevenue: { $sum: { $multiply: ['$price', '$quantity'] } },
                    },
                },
                { $sort: { totalRevenue: -1 } },
            ],
            priceRanges: [
                {
                    $bucket: {
                        groupBy: '$price',
                        boundaries: [0, 50, 200, 500, 2000],
                        default: 'Other',
                        output: { count: { $sum: 1 } },
                    },
                },
            ],
        },
    },
]);

Advanced Aggregation Operators

Array Operators

// Sample document with arrays
db.surveys.insertOne({
    _id: ObjectId('survey1'),
    respondent: 'John Doe',
    scores: [85, 92, 78, 90, 88],
    categories: ['tech', 'design', 'business'],
    responses: [
        { question: 'Q1', score: 85 },
        { question: 'Q2', score: 92 },
        { question: 'Q3', score: 78 },
    ],
});

// Array operations
db.surveys.aggregate([
    {
        $project: {
            respondent: 1,
            totalScores: { $sum: '$scores' },
            avgScore: { $avg: '$scores' },
            maxScore: { $max: '$scores' },
            minScore: { $min: '$scores' },
            scoreCount: { $size: '$scores' },
            firstScore: { $arrayElemAt: ['$scores', 0] },
            lastScore: { $arrayElemAt: ['$scores', -1] },
            highScores: {
                $filter: {
                    input: '$scores',
                    cond: { $gte: ['$$this', 90] },
                },
            },
            categoryList: {
                $reduce: {
                    input: '$categories',
                    initialValue: '',
                    in: {
                        $concat: [
                            '$$value',
                            { $cond: [{ $eq: ['$$value', ''] }, '', ', '] },
                            '$$this',
                        ],
                    },
                },
            },
        },
    },
]);

Date Operators

// Date manipulations
db.sales.aggregate([
    {
        $project: {
            product: 1,
            saleDate: 1,
            year: { $year: '$saleDate' },
            month: { $month: '$saleDate' },
            dayOfWeek: { $dayOfWeek: '$saleDate' },
            dayOfYear: { $dayOfYear: '$saleDate' },
            weekOfYear: { $week: '$saleDate' },
            quarter: {
                $ceil: { $divide: [{ $month: '$saleDate' }, 3] },
            },
            formattedDate: {
                $dateToString: {
                    format: '%Y-%m-%d',
                    date: '$saleDate',
                },
            },
            // Date arithmetic
            thirtyDaysAgo: {
                $dateSubtract: {
                    startDate: '$saleDate',
                    unit: 'day',
                    amount: 30,
                },
            },
        },
    },
]);

String Operators

// String manipulations
db.sales.aggregate([
    {
        $project: {
            product: 1,
            productUpper: { $toUpper: '$product' },
            productLower: { $toLower: '$product' },
            productLength: { $strLenCP: '$product' },
            productSubstring: { $substr: ['$product', 0, 3] },
            customerFirstName: {
                $arrayElemAt: [{ $split: ['$customer.name', ' '] }, 0],
            },
            customerLastName: {
                $arrayElemAt: [{ $split: ['$customer.name', ' '] }, -1],
            },
            productCode: {
                $concat: [{ $substr: ['$category', 0, 2] }, '-', { $substr: ['$product', 0, 3] }],
            },
        },
    },
]);

Complex Aggregation Examples

1. Sales Analytics Dashboard

// Comprehensive sales analysis
db.sales.aggregate([
    // Add calculated fields
    {
        $addFields: {
            total: { $multiply: ['$price', '$quantity'] },
            saleMonth: { $month: '$saleDate' },
            saleYear: { $year: '$saleDate' },
        },
    },
    // Multiple facets for dashboard
    {
        $facet: {
            // Monthly sales trend
            monthlySales: [
                {
                    $group: {
                        _id: { year: '$saleYear', month: '$saleMonth' },
                        revenue: { $sum: '$total' },
                        orders: { $sum: 1 },
                        avgOrderValue: { $avg: '$total' },
                    },
                },
                { $sort: { '_id.year': 1, '_id.month': 1 } },
            ],

            // Top salespeople
            topSalespeople: [
                {
                    $group: {
                        _id: '$salesperson',
                        totalRevenue: { $sum: '$total' },
                        totalOrders: { $sum: 1 },
                        avgOrderValue: { $avg: '$total' },
                    },
                },
                { $sort: { totalRevenue: -1 } },
                { $limit: 5 },
            ],

            // Category performance
            categoryPerformance: [
                {
                    $group: {
                        _id: '$category',
                        revenue: { $sum: '$total' },
                        units: { $sum: '$quantity' },
                        avgPrice: { $avg: '$price' },
                    },
                },
                { $sort: { revenue: -1 } },
            ],

            // Regional analysis
            regionalAnalysis: [
                {
                    $group: {
                        _id: '$customer.region',
                        revenue: { $sum: '$total' },
                        customers: { $addToSet: '$customer.email' },
                    },
                },
                {
                    $project: {
                        region: '$_id',
                        revenue: 1,
                        customerCount: { $size: '$customers' },
                        avgRevenuePerCustomer: { $divide: ['$revenue', { $size: '$customers' }] },
                    },
                },
                { $sort: { revenue: -1 } },
            ],
        },
    },
]);

2. Customer Segmentation

// RFM Analysis (Recency, Frequency, Monetary)
db.sales.aggregate([
    // Add total and days since last purchase
    {
        $addFields: {
            total: { $multiply: ['$price', '$quantity'] },
            daysSinceLastPurchase: {
                $divide: [
                    { $subtract: [new Date(), '$saleDate'] },
                    86400000, // milliseconds in a day
                ],
            },
        },
    },

    // Group by customer
    {
        $group: {
            _id: '$customer.email',
            customerName: { $first: '$customer.name' },
            region: { $first: '$customer.region' },
            recency: { $min: '$daysSinceLastPurchase' }, // Days since last purchase
            frequency: { $sum: 1 }, // Number of purchases
            monetary: { $sum: '$total' }, // Total spent
            firstPurchase: { $min: '$saleDate' },
            lastPurchase: { $max: '$saleDate' },
        },
    },

    // Create customer segments
    {
        $addFields: {
            recencyScore: {
                $cond: [
                    { $lte: ['$recency', 30] },
                    5,
                    {
                        $cond: [
                            { $lte: ['$recency', 60] },
                            4,
                            {
                                $cond: [
                                    { $lte: ['$recency', 90] },
                                    3,
                                    { $cond: [{ $lte: ['$recency', 180] }, 2, 1] },
                                ],
                            },
                        ],
                    },
                ],
            },
            frequencyScore: {
                $cond: [
                    { $gte: ['$frequency', 5] },
                    5,
                    {
                        $cond: [
                            { $gte: ['$frequency', 3] },
                            4,
                            { $cond: [{ $gte: ['$frequency', 2] }, 3, 2] },
                        ],
                    },
                ],
            },
            monetaryScore: {
                $cond: [
                    { $gte: ['$monetary', 1000] },
                    5,
                    {
                        $cond: [
                            { $gte: ['$monetary', 500] },
                            4,
                            {
                                $cond: [
                                    { $gte: ['$monetary', 200] },
                                    3,
                                    { $cond: [{ $gte: ['$monetary', 50] }, 2, 1] },
                                ],
                            },
                        ],
                    },
                ],
            },
        },
    },

    // Determine customer segment
    {
        $addFields: {
            rfmScore: { $add: ['$recencyScore', '$frequencyScore', '$monetaryScore'] },
            segment: {
                $switch: {
                    branches: [
                        {
                            case: {
                                $and: [
                                    { $gte: ['$recencyScore', 4] },
                                    { $gte: ['$frequencyScore', 4] },
                                    { $gte: ['$monetaryScore', 4] },
                                ],
                            },
                            then: 'Champions',
                        },
                        {
                            case: {
                                $and: [
                                    { $gte: ['$recencyScore', 3] },
                                    { $gte: ['$frequencyScore', 3] },
                                    { $gte: ['$monetaryScore', 3] },
                                ],
                            },
                            then: 'Loyal Customers',
                        },
                        {
                            case: {
                                $and: [
                                    { $gte: ['$recencyScore', 4] },
                                    { $lte: ['$frequencyScore', 2] },
                                ],
                            },
                            then: 'New Customers',
                        },
                        {
                            case: {
                                $and: [
                                    { $lte: ['$recencyScore', 2] },
                                    { $gte: ['$monetaryScore', 3] },
                                ],
                            },
                            then: 'At Risk',
                        },
                    ],
                    default: 'Others',
                },
            },
        },
    },

    { $sort: { rfmScore: -1 } },
]);

3. Cohort Analysis

// Customer cohort analysis
db.sales.aggregate([
    // Add helper fields
    {
        $addFields: {
            total: { $multiply: ['$price', '$quantity'] },
            orderMonth: {
                $dateFromParts: {
                    year: { $year: '$saleDate' },
                    month: { $month: '$saleDate' },
                    day: 1,
                },
            },
        },
    },

    // Get first purchase date for each customer
    {
        $group: {
            _id: '$customer.email',
            firstPurchaseMonth: { $min: '$orderMonth' },
            orders: {
                $push: {
                    orderMonth: '$orderMonth',
                    total: '$total',
                },
            },
        },
    },

    // Unwind orders to process each separately
    { $unwind: '$orders' },

    // Calculate months since first purchase
    {
        $addFields: {
            monthsSinceFirst: {
                $divide: [
                    { $subtract: ['$orders.orderMonth', '$firstPurchaseMonth'] },
                    2592000000, // milliseconds in 30 days
                ],
            },
        },
    },

    // Group by cohort and period
    {
        $group: {
            _id: {
                cohortMonth: '$firstPurchaseMonth',
                period: { $floor: '$monthsSinceFirst' },
            },
            customers: { $addToSet: '$_id' },
            revenue: { $sum: '$orders.total' },
        },
    },

    // Calculate cohort sizes
    {
        $group: {
            _id: '$_id.cohortMonth',
            periods: {
                $push: {
                    period: '$_id.period',
                    customers: { $size: '$customers' },
                    revenue: '$revenue',
                },
            },
        },
    },

    { $sort: { _id: 1 } },
]);

Performance Optimization

1. Pipeline Optimization

// ✅ Good: Filter early with $match
db.sales.aggregate([
    { $match: { saleDate: { $gte: ISODate('2024-01-01') } } }, // Filter first
    { $group: { _id: '$category', total: { $sum: '$price' } } },
    { $sort: { total: -1 } },
]);

// ❌ Bad: Filter after expensive operations
db.sales.aggregate([
    { $group: { _id: '$category', total: { $sum: '$price' } } },
    { $sort: { total: -1 } },
    { $match: { total: { $gte: 1000 } } }, // Should be earlier if possible
]);

2. Index Usage

// Create indexes to support aggregation stages
db.sales.createIndex({ saleDate: 1 }); // For $match on dates
db.sales.createIndex({ category: 1, price: 1 }); // For $group and $sort
db.sales.createIndex({ 'customer.region': 1 }); // For nested field queries

// Use $match stages that can use indexes
db.sales.aggregate([
    { $match: { category: 'Electronics' } }, // Uses index
    // ... rest of pipeline
]);

3. Memory Usage

// For large datasets, consider using allowDiskUse
db.sales.aggregate(
    [
        // ... complex pipeline
    ],
    { allowDiskUse: true },
);

// Monitor memory usage
db.adminCommand({ getParameter: 1, internalQueryExecMaxBlockingSortBytes: 1 });

Common Aggregation Patterns

1. Top N per Group

// Top 2 products per category by revenue
db.sales.aggregate([
    {
        $addFields: {
            revenue: { $multiply: ['$price', '$quantity'] },
        },
    },
    { $sort: { category: 1, revenue: -1 } },
    {
        $group: {
            _id: '$category',
            topProducts: { $push: '$$ROOT' },
        },
    },
    {
        $project: {
            category: '$_id',
            topProducts: { $slice: ['$topProducts', 2] },
        },
    },
]);

2. Moving Averages

// 3-day moving average of sales
db.dailySales.aggregate([
    { $sort: { date: 1 } },
    {
        $setWindowFields: {
            sortBy: { date: 1 },
            output: {
                movingAvg: {
                    $avg: '$sales',
                    window: {
                        documents: [-2, 0], // Current + 2 previous days
                    },
                },
            },
        },
    },
]);

3. Pivot Tables

// Sales by category and region (pivot table)
db.sales.aggregate([
    {
        $addFields: {
            revenue: { $multiply: ['$price', '$quantity'] },
        },
    },
    {
        $group: {
            _id: {
                category: '$category',
                region: '$customer.region',
            },
            revenue: { $sum: '$revenue' },
        },
    },
    {
        $group: {
            _id: '$_id.category',
            regions: {
                $push: {
                    region: '$_id.region',
                    revenue: '$revenue',
                },
            },
            totalRevenue: { $sum: '$revenue' },
        },
    },
]);

What's Next?

Now that you've mastered the Aggregation Pipeline, you're ready to integrate MongoDB with applications using Node.js Integration, or learn about Production Deployment.

Series Navigation


This is Part 6 of the MongoDB Zero to Hero series. The Aggregation Pipeline is one of MongoDB's most powerful features - master it to unlock advanced data processing capabilities.

Enjoyed this post?

Subscribe to get notified about new posts and updates. No spam, unsubscribe anytime.

By subscribing, you agree to our Privacy Policy. You can unsubscribe at any time.

Discussion (0)

This website is still under development. If you encounter any issues, please contact me