Published on

MongoDB Advanced Topics: Expert-Level Features

Authors

MongoDB Advanced Topics: Expert-Level Features

Welcome to the final part of our MongoDB Zero to Hero series. After mastering monitoring and maintenance, it's time to explore MongoDB's advanced features that enable sophisticated applications and use cases.

Multi-Document Transactions

MongoDB 4.0+ supports ACID transactions across multiple documents and collections.

Basic Transaction Usage

// Node.js with MongoDB Driver
const session = client.startSession();

try {
    await session.withTransaction(
        async () => {
            // Transfer money between accounts
            const accounts = db.collection('accounts');

            // Debit from source account
            await accounts.updateOne(
                { accountId: 'A123', balance: { $gte: 100 } },
                { $inc: { balance: -100 } },
                { session },
            );

            // Credit to destination account
            await accounts.updateOne(
                { accountId: 'B456' },
                { $inc: { balance: 100 } },
                { session },
            );

            // Log the transaction
            await db.collection('transactions').insertOne(
                {
                    from: 'A123',
                    to: 'B456',
                    amount: 100,
                    timestamp: new Date(),
                    type: 'transfer',
                },
                { session },
            );
        },
        {
            readConcern: { level: 'majority' },
            writeConcern: { w: 'majority' },
            readPreference: 'primary',
        },
    );

    console.log('Transaction completed successfully');
} catch (error) {
    console.error('Transaction failed:', error);
} finally {
    await session.endSession();
}

Complex Multi-Collection Transaction

// E-commerce order processing with inventory management
async function processOrder(orderData) {
    const session = client.startSession();

    try {
        const result = await session.withTransaction(async () => {
            const orders = db.collection('orders');
            const inventory = db.collection('inventory');
            const customers = db.collection('customers');
            const notifications = db.collection('notifications');

            // 1. Create the order
            const order = {
                _id: new ObjectId(),
                customerId: orderData.customerId,
                items: orderData.items,
                total: orderData.total,
                status: 'pending',
                createdAt: new Date(),
            };

            await orders.insertOne(order, { session });

            // 2. Update inventory for each item
            for (const item of orderData.items) {
                const updateResult = await inventory.updateOne(
                    {
                        productId: item.productId,
                        quantity: { $gte: item.quantity },
                        reserved: { $exists: true },
                    },
                    {
                        $inc: {
                            quantity: -item.quantity,
                            reserved: item.quantity,
                        },
                    },
                    { session },
                );

                if (updateResult.matchedCount === 0) {
                    throw new Error(`Insufficient inventory for product ${item.productId}`);
                }
            }

            // 3. Update customer order history
            await customers.updateOne(
                { _id: orderData.customerId },
                {
                    $push: { orderHistory: order._id },
                    $inc: { totalSpent: orderData.total },
                },
                { session },
            );

            // 4. Create notification
            await notifications.insertOne(
                {
                    userId: orderData.customerId,
                    type: 'order_confirmation',
                    orderId: order._id,
                    message: `Order ${order._id} has been placed successfully`,
                    createdAt: new Date(),
                    read: false,
                },
                { session },
            );

            return order;
        });

        return { success: true, order: result };
    } catch (error) {
        return { success: false, error: error.message };
    } finally {
        await session.endSession();
    }
}

// Usage
const orderResult = await processOrder({
    customerId: ObjectId('customer123'),
    items: [
        { productId: 'laptop001', quantity: 1, price: 999.99 },
        { productId: 'mouse001', quantity: 2, price: 29.99 },
    ],
    total: 1059.97,
});

Transaction Best Practices

// Transaction configuration for different scenarios
const transactionConfigs = {
    // High consistency requirements
    financial: {
        readConcern: { level: 'majority' },
        writeConcern: { w: 'majority', j: true },
        readPreference: 'primary',
        maxCommitTimeMS: 5000,
    },

    // Balanced performance and consistency
    standard: {
        readConcern: { level: 'local' },
        writeConcern: { w: 'majority' },
        readPreference: 'primary',
        maxCommitTimeMS: 3000,
    },

    // Performance optimized (eventual consistency acceptable)
    eventual: {
        readConcern: { level: 'local' },
        writeConcern: { w: 1 },
        readPreference: 'primaryPreferred',
        maxCommitTimeMS: 1000,
    },
};

// Retry transaction with exponential backoff
async function executeTransactionWithRetry(transactionFunction, maxRetries = 3) {
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
        const session = client.startSession();

        try {
            const result = await session.withTransaction(
                transactionFunction,
                transactionConfigs.standard,
            );

            await session.endSession();
            return { success: true, result };
        } catch (error) {
            await session.endSession();

            // Check if error is retryable
            if (error.hasErrorLabel('TransientTransactionError') && attempt < maxRetries) {
                const delay = Math.pow(2, attempt) * 100; // Exponential backoff
                await new Promise((resolve) => setTimeout(resolve, delay));
                continue;
            }

            return { success: false, error: error.message, attempt };
        }
    }
}

Change Streams

Change Streams allow applications to access real-time data changes without polling.

Basic Change Stream

// Watch for changes in a collection
const changeStream = db.collection('orders').watch();

changeStream.on('change', (change) => {
    console.log('Change detected:', change);

    switch (change.operationType) {
        case 'insert':
            handleNewOrder(change.fullDocument);
            break;
        case 'update':
            handleOrderUpdate(change.documentKey._id, change.updateDescription);
            break;
        case 'delete':
            handleOrderDeletion(change.documentKey._id);
            break;
        case 'replace':
            handleOrderReplacement(change.fullDocument);
            break;
    }
});

// Handle new orders
function handleNewOrder(order) {
    console.log('New order received:', order._id);

    // Send notification
    notificationService.send({
        to: order.customerId,
        message: `Order ${order._id} has been placed`,
        type: 'order_confirmation',
    });

    // Update analytics
    analyticsService.recordEvent('order_placed', {
        orderId: order._id,
        amount: order.total,
        items: order.items.length,
    });
}

Advanced Change Stream with Pipeline

// Watch for specific changes with aggregation pipeline
const pipeline = [
    {
        $match: {
            operationType: { $in: ['insert', 'update'] },
            'fullDocument.status': 'completed',
            'fullDocument.total': { $gte: 100 },
        },
    },
    {
        $project: {
            _id: 1,
            operationType: 1,
            'fullDocument._id': 1,
            'fullDocument.customerId': 1,
            'fullDocument.total': 1,
            'fullDocument.status': 1,
        },
    },
];

const changeStream = db.collection('orders').watch(pipeline, {
    fullDocument: 'updateLookup',
});

changeStream.on('change', (change) => {
    console.log('High-value order completed:', change.fullDocument);

    // Process high-value order completion
    processHighValueOrder(change.fullDocument);
});

async function processHighValueOrder(order) {
    // Award loyalty points
    await db
        .collection('customers')
        .updateOne(
            { _id: order.customerId },
            { $inc: { loyaltyPoints: Math.floor(order.total / 10) } },
        );

    // Send personalized thank you
    await emailService.sendTemplate('high_value_thanks', {
        customerId: order.customerId,
        orderTotal: order.total,
    });

    // Update customer tier if applicable
    const customer = await db.collection('customers').findOne({ _id: order.customerId });
    if (customer.totalSpent > 1000 && customer.tier !== 'gold') {
        await customerService.upgradeTier(customer._id, 'gold');
    }
}

Real-time Application Updates

// Real-time dashboard using Change Streams
class RealTimeDashboard {
    constructor(io) {
        this.io = io; // Socket.io instance
        this.setupChangeStreams();
    }

    setupChangeStreams() {
        // Watch orders for real-time sales tracking
        const ordersChangeStream = db
            .collection('orders')
            .watch([{ $match: { operationType: { $in: ['insert', 'update'] } } }]);

        ordersChangeStream.on('change', (change) => {
            this.handleOrderChange(change);
        });

        // Watch user activity
        const usersChangeStream = db.collection('users').watch([
            {
                $match: {
                    operationType: 'update',
                    'updateDescription.updatedFields.lastActive': { $exists: true },
                },
            },
        ]);

        usersChangeStream.on('change', (change) => {
            this.handleUserActivity(change);
        });

        // Watch inventory levels
        const inventoryChangeStream = db
            .collection('inventory')
            .watch([{ $match: { operationType: 'update', 'fullDocument.quantity': { $lt: 10 } } }]);

        inventoryChangeStream.on('change', (change) => {
            this.handleLowInventory(change);
        });
    }

    handleOrderChange(change) {
        const order = change.fullDocument;

        // Emit real-time updates to connected clients
        this.io.emit('order_update', {
            type: change.operationType,
            orderId: order._id,
            status: order.status,
            total: order.total,
            timestamp: new Date(),
        });

        // Update daily sales counter
        if (change.operationType === 'insert') {
            this.updateDailySales(order.total);
        }
    }

    handleUserActivity(change) {
        this.io.emit('user_activity', {
            userId: change.documentKey._id,
            lastActive: change.updateDescription.updatedFields.lastActive,
            timestamp: new Date(),
        });
    }

    handleLowInventory(change) {
        const product = change.fullDocument;

        // Alert inventory managers
        this.io.to('inventory_managers').emit('low_inventory_alert', {
            productId: product._id,
            productName: product.name,
            currentQuantity: product.quantity,
            threshold: 10,
        });
    }

    async updateDailySales(amount) {
        const today = new Date().toISOString().split('T')[0];

        await db.collection('daily_sales').updateOne(
            { date: today },
            {
                $inc: { total: amount, count: 1 },
                $setOnInsert: { date: today },
            },
            { upsert: true },
        );
    }
}

GridFS for Large File Storage

GridFS stores files larger than 16MB by splitting them into chunks.

Basic GridFS Operations

const { GridFSBucket } = require('mongodb');

// Create GridFS bucket
const bucket = new GridFSBucket(db, { bucketName: 'files' });

// Upload file
async function uploadFile(filePath, filename, metadata = {}) {
    return new Promise((resolve, reject) => {
        const uploadStream = bucket.openUploadStream(filename, {
            metadata: {
                ...metadata,
                uploadDate: new Date(),
                uploader: 'system',
            },
        });

        const readStream = fs.createReadStream(filePath);

        uploadStream.on('error', reject);
        uploadStream.on('finish', () => {
            resolve({
                fileId: uploadStream.id,
                filename: filename,
                length: uploadStream.length,
            });
        });

        readStream.pipe(uploadStream);
    });
}

// Download file
async function downloadFile(fileId, downloadPath) {
    return new Promise((resolve, reject) => {
        const downloadStream = bucket.openDownloadStream(fileId);
        const writeStream = fs.createWriteStream(downloadPath);

        downloadStream.on('error', reject);
        writeStream.on('error', reject);
        writeStream.on('finish', resolve);

        downloadStream.pipe(writeStream);
    });
}

// Stream file to HTTP response
function streamFileToResponse(fileId, res) {
    const downloadStream = bucket.openDownloadStream(fileId);

    downloadStream.on('error', (error) => {
        res.status(404).json({ error: 'File not found' });
    });

    downloadStream.on('file', (file) => {
        res.set({
            'Content-Type': file.metadata?.contentType || 'application/octet-stream',
            'Content-Length': file.length,
            'Content-Disposition': `attachment; filename="${file.filename}"`,
        });
    });

    downloadStream.pipe(res);
}

// Delete file
async function deleteFile(fileId) {
    try {
        await bucket.delete(fileId);
        return { success: true };
    } catch (error) {
        return { success: false, error: error.message };
    }
}

Advanced GridFS File Management

class GridFSManager {
    constructor(db, bucketName = 'files') {
        this.bucket = new GridFSBucket(db, { bucketName });
        this.db = db;
    }

    // Upload with progress tracking
    async uploadWithProgress(filePath, filename, metadata = {}, onProgress) {
        const stats = fs.statSync(filePath);
        const totalSize = stats.size;
        let uploadedSize = 0;

        return new Promise((resolve, reject) => {
            const uploadStream = this.bucket.openUploadStream(filename, {
                chunkSizeBytes: 1024 * 1024, // 1MB chunks
                metadata: {
                    ...metadata,
                    originalSize: totalSize,
                    uploadDate: new Date(),
                },
            });

            const readStream = fs.createReadStream(filePath);

            readStream.on('data', (chunk) => {
                uploadedSize += chunk.length;
                const progress = (uploadedSize / totalSize) * 100;
                onProgress && onProgress(Math.round(progress));
            });

            uploadStream.on('error', reject);
            uploadStream.on('finish', () => {
                resolve({
                    fileId: uploadStream.id,
                    filename: filename,
                    size: uploadStream.length,
                });
            });

            readStream.pipe(uploadStream);
        });
    }

    // List files with pagination
    async listFiles(options = {}) {
        const {
            page = 1,
            limit = 20,
            sortBy = 'uploadDate',
            sortOrder = -1,
            filter = {},
        } = options;

        const skip = (page - 1) * limit;

        const files = await this.bucket
            .find(filter)
            .sort({ [sortBy]: sortOrder })
            .skip(skip)
            .limit(limit)
            .toArray();

        const totalCount = await this.bucket.find(filter).count();

        return {
            files: files.map((file) => ({
                _id: file._id,
                filename: file.filename,
                length: file.length,
                uploadDate: file.uploadDate,
                metadata: file.metadata,
            })),
            pagination: {
                currentPage: page,
                totalPages: Math.ceil(totalCount / limit),
                totalCount,
                hasNext: skip + files.length < totalCount,
                hasPrev: page > 1,
            },
        };
    }

    // File information
    async getFileInfo(fileId) {
        try {
            const files = await this.bucket.find({ _id: fileId }).toArray();
            return files[0] || null;
        } catch (error) {
            return null;
        }
    }

    // Resize images using Sharp
    async resizeImage(fileId, width, height) {
        const sharp = require('sharp');

        return new Promise((resolve, reject) => {
            const downloadStream = this.bucket.openDownloadStream(fileId);
            const chunks = [];

            downloadStream.on('data', (chunk) => chunks.push(chunk));
            downloadStream.on('error', reject);
            downloadStream.on('end', async () => {
                try {
                    const buffer = Buffer.concat(chunks);
                    const resizedBuffer = await sharp(buffer)
                        .resize(width, height)
                        .jpeg({ quality: 80 })
                        .toBuffer();

                    // Upload resized image
                    const originalFile = await this.getFileInfo(fileId);
                    const resizedFilename = `${originalFile.filename}_${width}x${height}.jpg`;

                    const uploadResult = await this.uploadBuffer(resizedBuffer, resizedFilename, {
                        originalFileId: fileId,
                        resized: true,
                        dimensions: { width, height },
                    });

                    resolve(uploadResult);
                } catch (error) {
                    reject(error);
                }
            });
        });
    }

    // Upload from buffer
    async uploadBuffer(buffer, filename, metadata = {}) {
        return new Promise((resolve, reject) => {
            const uploadStream = this.bucket.openUploadStream(filename, {
                metadata: {
                    ...metadata,
                    uploadDate: new Date(),
                    size: buffer.length,
                },
            });

            uploadStream.on('error', reject);
            uploadStream.on('finish', () => {
                resolve({
                    fileId: uploadStream.id,
                    filename: filename,
                    size: buffer.length,
                });
            });

            uploadStream.end(buffer);
        });
    }

    // Cleanup orphaned chunks
    async cleanupOrphanedChunks() {
        const filesCollection = this.db.collection(`${this.bucket.bucketName}.files`);
        const chunksCollection = this.db.collection(`${this.bucket.bucketName}.chunks`);

        // Find chunks without corresponding files
        const orphanedChunks = await chunksCollection
            .aggregate([
                {
                    $lookup: {
                        from: `${this.bucket.bucketName}.files`,
                        localField: 'files_id',
                        foreignField: '_id',
                        as: 'file',
                    },
                },
                {
                    $match: { file: { $size: 0 } },
                },
            ])
            .toArray();

        if (orphanedChunks.length > 0) {
            const orphanedIds = orphanedChunks.map((chunk) => chunk._id);
            const result = await chunksCollection.deleteMany({ _id: { $in: orphanedIds } });
            return { deletedChunks: result.deletedCount };
        }

        return { deletedChunks: 0 };
    }
}

Time Series Collections

MongoDB 5.0+ supports optimized time series collections for time-stamped data.

Creating Time Series Collections

// Create time series collection
await db.createCollection('weather_data', {
    timeseries: {
        timeField: 'timestamp',
        metaField: 'sensor',
        granularity: 'hours',
    },
});

// Create time series collection for stock prices
await db.createCollection('stock_prices', {
    timeseries: {
        timeField: 'date',
        metaField: 'symbol',
        granularity: 'minutes',
    },
});

// Insert time series data
await db.collection('weather_data').insertMany([
    {
        timestamp: new Date('2024-01-15T10:00:00Z'),
        sensor: { id: 'sensor001', location: 'New York' },
        temperature: 22.5,
        humidity: 65,
        pressure: 1013.25,
    },
    {
        timestamp: new Date('2024-01-15T11:00:00Z'),
        sensor: { id: 'sensor001', location: 'New York' },
        temperature: 23.1,
        humidity: 63,
        pressure: 1013.5,
    },
    {
        timestamp: new Date('2024-01-15T10:00:00Z'),
        sensor: { id: 'sensor002', location: 'Boston' },
        temperature: 18.7,
        humidity: 70,
        pressure: 1012.8,
    },
]);

Time Series Queries and Aggregations

// Time-based queries
async function getWeatherDataForPeriod(sensorId, startDate, endDate) {
    return await db
        .collection('weather_data')
        .find({
            'sensor.id': sensorId,
            timestamp: {
                $gte: startDate,
                $lte: endDate,
            },
        })
        .sort({ timestamp: 1 })
        .toArray();
}

// Aggregation pipelines for time series
async function getHourlyAverages(sensorId, date) {
    return await db
        .collection('weather_data')
        .aggregate([
            {
                $match: {
                    'sensor.id': sensorId,
                    timestamp: {
                        $gte: new Date(date + 'T00:00:00Z'),
                        $lt: new Date(date + 'T23:59:59Z'),
                    },
                },
            },
            {
                $group: {
                    _id: {
                        hour: { $hour: '$timestamp' },
                    },
                    avgTemperature: { $avg: '$temperature' },
                    avgHumidity: { $avg: '$humidity' },
                    avgPressure: { $avg: '$pressure' },
                    count: { $sum: 1 },
                },
            },
            {
                $sort: { '_id.hour': 1 },
            },
        ])
        .toArray();
}

// Moving averages
async function getMovingAverage(sensorId, windowSize = 5) {
    return await db
        .collection('weather_data')
        .aggregate([
            {
                $match: { 'sensor.id': sensorId },
            },
            {
                $sort: { timestamp: 1 },
            },
            {
                $setWindowFields: {
                    partitionBy: '$sensor.id',
                    sortBy: { timestamp: 1 },
                    output: {
                        movingAvgTemp: {
                            $avg: '$temperature',
                            window: {
                                documents: [-windowSize + 1, 0],
                            },
                        },
                        movingAvgHumidity: {
                            $avg: '$humidity',
                            window: {
                                documents: [-windowSize + 1, 0],
                            },
                        },
                    },
                },
            },
        ])
        .toArray();
}

// Detect anomalies using standard deviation
async function detectTemperatureAnomalies(sensorId, threshold = 2) {
    return await db
        .collection('weather_data')
        .aggregate([
            {
                $match: { 'sensor.id': sensorId },
            },
            {
                $setWindowFields: {
                    partitionBy: '$sensor.id',
                    sortBy: { timestamp: 1 },
                    output: {
                        avgTemp: {
                            $avg: '$temperature',
                            window: { documents: [-50, 0] },
                        },
                        stdDevTemp: {
                            $stdDevPop: '$temperature',
                            window: { documents: [-50, 0] },
                        },
                    },
                },
            },
            {
                $addFields: {
                    isAnomaly: {
                        $gt: [
                            { $abs: { $subtract: ['$temperature', '$avgTemp'] } },
                            { $multiply: ['$stdDevTemp', threshold] },
                        ],
                    },
                },
            },
            {
                $match: { isAnomaly: true },
            },
        ])
        .toArray();
}

IoT Data Pipeline with Time Series

// IoT sensor data processing pipeline
class IoTDataProcessor {
    constructor(db) {
        this.db = db;
        this.sensorData = db.collection('sensor_data');
        this.alerts = db.collection('alerts');
    }

    async processSensorReading(reading) {
        // Store raw sensor data
        await this.sensorData.insertOne({
            timestamp: new Date(),
            sensor: {
                id: reading.sensorId,
                type: reading.sensorType,
                location: reading.location,
            },
            ...reading.data,
        });

        // Check for alerts
        await this.checkAlerts(reading);

        // Update real-time aggregations
        await this.updateRealTimeStats(reading);
    }

    async checkAlerts(reading) {
        const thresholds = {
            temperature: { min: 0, max: 40 },
            humidity: { min: 30, max: 80 },
            pressure: { min: 950, max: 1050 },
        };

        for (const [metric, value] of Object.entries(reading.data)) {
            const threshold = thresholds[metric];
            if (threshold) {
                if (value < threshold.min || value > threshold.max) {
                    await this.createAlert({
                        sensorId: reading.sensorId,
                        metric,
                        value,
                        threshold,
                        severity: this.calculateSeverity(value, threshold),
                        timestamp: new Date(),
                    });
                }
            }
        }
    }

    async createAlert(alertData) {
        await this.alerts.insertOne(alertData);

        // Send real-time notification
        this.notificationService.send({
            type: 'sensor_alert',
            data: alertData,
        });
    }

    calculateSeverity(value, threshold) {
        const deviation = Math.max(
            Math.abs(value - threshold.min) / threshold.min,
            Math.abs(value - threshold.max) / threshold.max,
        );

        if (deviation > 0.5) return 'critical';
        if (deviation > 0.2) return 'warning';
        return 'info';
    }

    // Get sensor statistics for dashboard
    async getSensorDashboard(sensorId, timeRange = '24h') {
        const timeRanges = {
            '1h': 1000 * 60 * 60,
            '24h': 1000 * 60 * 60 * 24,
            '7d': 1000 * 60 * 60 * 24 * 7,
        };

        const startTime = new Date(Date.now() - timeRanges[timeRange]);

        const [currentStats, historicalData, alerts] = await Promise.all([
            this.getCurrentStats(sensorId),
            this.getHistoricalData(sensorId, startTime),
            this.getRecentAlerts(sensorId, startTime),
        ]);

        return {
            sensorId,
            timeRange,
            current: currentStats,
            historical: historicalData,
            alerts: alerts,
            lastUpdated: new Date(),
        };
    }

    async getCurrentStats(sensorId) {
        const latest = await this.sensorData.findOne(
            { 'sensor.id': sensorId },
            { sort: { timestamp: -1 } },
        );

        return latest
            ? {
                  timestamp: latest.timestamp,
                  temperature: latest.temperature,
                  humidity: latest.humidity,
                  pressure: latest.pressure,
              }
            : null;
    }

    async getHistoricalData(sensorId, startTime) {
        return await this.sensorData
            .aggregate([
                {
                    $match: {
                        'sensor.id': sensorId,
                        timestamp: { $gte: startTime },
                    },
                },
                {
                    $group: {
                        _id: {
                            hour: { $hour: '$timestamp' },
                            day: { $dayOfMonth: '$timestamp' },
                            month: { $month: '$timestamp' },
                            year: { $year: '$timestamp' },
                        },
                        avgTemperature: { $avg: '$temperature' },
                        minTemperature: { $min: '$temperature' },
                        maxTemperature: { $max: '$temperature' },
                        avgHumidity: { $avg: '$humidity' },
                        avgPressure: { $avg: '$pressure' },
                        readings: { $sum: 1 },
                    },
                },
                {
                    $sort: {
                        '_id.year': 1,
                        '_id.month': 1,
                        '_id.day': 1,
                        '_id.hour': 1,
                    },
                },
            ])
            .toArray();
    }

    async getRecentAlerts(sensorId, startTime) {
        return await this.alerts
            .find({
                sensorId,
                timestamp: { $gte: startTime },
            })
            .sort({ timestamp: -1 })
            .limit(50)
            .toArray();
    }
}

Full-Text Search

MongoDB provides powerful text search capabilities.

Text Index and Basic Search

// Create text index
await db.collection('articles').createIndex(
    {
        title: 'text',
        content: 'text',
        tags: 'text',
    },
    {
        weights: {
            title: 10,
            content: 5,
            tags: 1,
        },
        name: 'article_text_index',
    },
);

// Basic text search
async function searchArticles(searchTerm) {
    return await db
        .collection('articles')
        .find(
            {
                $text: { $search: searchTerm },
            },
            {
                score: { $meta: 'textScore' },
            },
        )
        .sort({
            score: { $meta: 'textScore' },
        })
        .toArray();
}

// Advanced text search with filters
async function advancedSearch(query) {
    const { searchTerm, category, author, dateRange, minScore = 0.5 } = query;

    const pipeline = [];

    // Text search match
    if (searchTerm) {
        pipeline.push({
            $match: {
                $text: { $search: searchTerm },
            },
        });

        pipeline.push({
            $addFields: {
                score: { $meta: 'textScore' },
            },
        });

        pipeline.push({
            $match: {
                score: { $gte: minScore },
            },
        });
    }

    // Additional filters
    const additionalMatch = {};

    if (category) {
        additionalMatch.category = category;
    }

    if (author) {
        additionalMatch.author = new RegExp(author, 'i');
    }

    if (dateRange) {
        additionalMatch.publishedDate = {
            $gte: new Date(dateRange.start),
            $lte: new Date(dateRange.end),
        };
    }

    if (Object.keys(additionalMatch).length > 0) {
        pipeline.push({ $match: additionalMatch });
    }

    // Sort by relevance score
    pipeline.push({
        $sort: searchTerm ? { score: -1 } : { publishedDate: -1 },
    });

    return await db.collection('articles').aggregate(pipeline).toArray();
}

Advanced Search Features

// Search with highlighting and snippets
class AdvancedSearchService {
    constructor(db) {
        this.db = db;
    }

    async searchWithHighlights(searchTerm, options = {}) {
        const { limit = 20, skip = 0, categories = [], sortBy = 'relevance' } = options;

        const pipeline = [
            {
                $match: {
                    $text: { $search: searchTerm },
                    ...(categories.length > 0 && { category: { $in: categories } }),
                },
            },
            {
                $addFields: {
                    score: { $meta: 'textScore' },
                },
            },
        ];

        // Add sorting
        if (sortBy === 'relevance') {
            pipeline.push({ $sort: { score: -1 } });
        } else if (sortBy === 'date') {
            pipeline.push({ $sort: { publishedDate: -1 } });
        }

        // Pagination
        pipeline.push({ $skip: skip });
        pipeline.push({ $limit: limit });

        // Add highlighting and snippets
        pipeline.push({
            $addFields: {
                highlights: this.generateHighlights('$content', searchTerm),
                snippet: this.generateSnippet('$content', searchTerm, 200),
            },
        });

        const results = await this.db.collection('articles').aggregate(pipeline).toArray();

        return {
            results,
            searchTerm,
            totalFound: await this.getSearchCount(searchTerm, categories),
        };
    }

    generateHighlights(field, searchTerm) {
        // This is a simplified version - in practice, you'd use more sophisticated highlighting
        return {
            $regexFindAll: {
                input: field,
                regex: new RegExp(searchTerm.split(' ').join('|'), 'i'),
            },
        };
    }

    generateSnippet(field, searchTerm, length) {
        // Extract snippet around search term
        return {
            $let: {
                vars: {
                    content: field,
                    searchRegex: new RegExp(searchTerm, 'i'),
                },
                in: {
                    $cond: {
                        if: { $regexMatch: { input: '$$content', regex: '$$searchRegex' } },
                        then: {
                            $substr: [
                                '$$content',
                                {
                                    $max: [
                                        0,
                                        {
                                            $subtract: [
                                                { $indexOfCP: ['$$content', searchTerm] },
                                                length / 2,
                                            ],
                                        },
                                    ],
                                },
                                length,
                            ],
                        },
                        else: { $substr: ['$$content', 0, length] },
                    },
                },
            },
        };
    }

    async getSearchCount(searchTerm, categories = []) {
        return await this.db.collection('articles').countDocuments({
            $text: { $search: searchTerm },
            ...(categories.length > 0 && { category: { $in: categories } }),
        });
    }

    // Autocomplete suggestions
    async getAutocompleteSuggestions(partialTerm, limit = 10) {
        return await this.db
            .collection('articles')
            .aggregate([
                {
                    $match: {
                        $or: [
                            { title: new RegExp('^' + partialTerm, 'i') },
                            { tags: new RegExp('^' + partialTerm, 'i') },
                        ],
                    },
                },
                {
                    $project: {
                        suggestions: {
                            $concatArrays: [
                                {
                                    $cond: [
                                        {
                                            $regexMatch: {
                                                input: '$title',
                                                regex: new RegExp('^' + partialTerm, 'i'),
                                            },
                                        },
                                        ['$title'],
                                        [],
                                    ],
                                },
                                {
                                    $filter: {
                                        input: '$tags',
                                        cond: {
                                            $regexMatch: {
                                                input: '$$this',
                                                regex: new RegExp('^' + partialTerm, 'i'),
                                            },
                                        },
                                    },
                                },
                            ],
                        },
                    },
                },
                { $unwind: '$suggestions' },
                { $group: { _id: '$suggestions' } },
                { $limit: limit },
                { $project: { _id: 0, suggestion: '$_id' } },
            ])
            .toArray();
    }

    // Search analytics
    async recordSearchQuery(searchTerm, resultsCount, userId = null) {
        await this.db.collection('search_analytics').insertOne({
            searchTerm,
            resultsCount,
            userId,
            timestamp: new Date(),
            sessionId: this.generateSessionId(),
        });
    }

    async getPopularSearchTerms(timeRange = '7d', limit = 10) {
        const timeRanges = {
            '1d': 1000 * 60 * 60 * 24,
            '7d': 1000 * 60 * 60 * 24 * 7,
            '30d': 1000 * 60 * 60 * 24 * 30,
        };

        const startTime = new Date(Date.now() - timeRanges[timeRange]);

        return await this.db
            .collection('search_analytics')
            .aggregate([
                {
                    $match: {
                        timestamp: { $gte: startTime },
                        resultsCount: { $gt: 0 },
                    },
                },
                {
                    $group: {
                        _id: '$searchTerm',
                        count: { $sum: 1 },
                        avgResults: { $avg: '$resultsCount' },
                        uniqueUsers: { $addToSet: '$userId' },
                    },
                },
                {
                    $addFields: {
                        uniqueUserCount: { $size: '$uniqueUsers' },
                    },
                },
                {
                    $sort: { count: -1 },
                },
                {
                    $limit: limit,
                },
            ])
            .toArray();
    }

    generateSessionId() {
        return (
            Math.random().toString(36).substring(2, 15) +
            Math.random().toString(36).substring(2, 15)
        );
    }
}

Conclusion

Congratulations! You've completed the MongoDB Zero to Hero series. You've learned:

  1. MongoDB Fundamentals - Core concepts and document model
  2. Installation and Setup - Getting MongoDB running
  3. CRUD Operations - Basic database operations
  4. Data Modeling - Schema design patterns
  5. Indexing and Performance - Optimization strategies
  6. Aggregation Pipeline - Advanced data processing
  7. Node.js Integration - Building applications
  8. Production Deployment - Scaling and reliability
  9. Monitoring and Maintenance - Operational practices
  10. Advanced Topics - Expert-level features

You now have the knowledge to build production-ready MongoDB applications. Continue exploring MongoDB's evolving feature set and applying these concepts to real-world projects.

Series Navigation


This concludes the MongoDB Zero to Hero series. You're now equipped with expert-level MongoDB knowledge to tackle any database challenge!

Enjoyed this post?

Subscribe to get notified about new posts and updates. No spam, unsubscribe anytime.

By subscribing, you agree to our Privacy Policy. You can unsubscribe at any time.

Discussion (0)

This website is still under development. If you encounter any issues, please contact me