TTL & Change Streams
This module covers two MongoDB features for managing data lifecycle and reacting to changes: TTL (Time-To-Live) indexes that automatically expire documents, and Change Streams for real-time data change notifications.
TTL (Time-To-Live) Indexes
MongoDB can automatically delete documents after a certain time, useful for temporary data like sessions, logs, and cache.
TTL Index Basics
// Create a TTL index on a Date field
// Documents are deleted after the specified seconds have elapsed
db.sessions.createIndex(
{ createdAt: 1 },
{ expireAfterSeconds: 3600 } // Delete after 1 hour
);
// Insert session data
db.sessions.insertOne({
_id: ObjectId("..."),
userId: "alice",
token: "abc123xyz",
createdAt: new Date() // Document created now
});
// After 3600 seconds (1 hour), MongoDB automatically deletes this document
// No action needed — completely automated
TTL Index Rules
// 1. TTL field must be a Date type
db.logs.createIndex({ timestamp: 1 }, { expireAfterSeconds: 86400 }); // ✅ Date field
// 2. Cannot be _id field
db.sessions.createIndex({ _id: 1 }, { expireAfterSeconds: 3600 }); // ❌ Error
// 3. Can combine with other index properties
db.cache.createIndex(
{ expiresAt: 1 },
{ expireAfterSeconds: 0, unique: true, sparse: true }
);
// 4. expireAfterSeconds: 0 means expire immediately on creation
db.tempData.createIndex({ createdAt: 1 }, { expireAfterSeconds: 0 });
// Useful for one-time use documents
Practical TTL Examples
Sessions (Auto-expire after 24 hours)
// Create index: sessions older than 24h are deleted
db.sessions.createIndex(
{ createdAt: 1 },
{ expireAfterSeconds: 86400 } // 24 hours
);
// Insert session
db.sessions.insertOne({
_id: ObjectId("..."),
userId: "alice",
token: generateToken(),
createdAt: new Date(),
lastActivity: new Date()
});
// Automatic cleanup — no cron job needed
// Perfect for temporary authentication tokens
Cache (Auto-expire after custom duration)
// Cache entries older than 30 minutes
db.cache.createIndex(
{ expiresAt: 1 },
{ expireAfterSeconds: 1800 }
);
db.cache.insertOne({
key: "user:alice:profile",
value: { name: "Alice", email: "alice@example.com" },
expiresAt: new Date() // Now + 30 min by TTL
});
Application Logs (Auto-expire after 7 days)
db.logs.createIndex(
{ timestamp: 1 },
{ expireAfterSeconds: 604800 } // 7 days
);
db.logs.insertOne({
level: "info",
message: "User alice logged in",
timestamp: new Date()
});
// Logs older than 7 days are automatically removed
// No need for manual cleanup scripts
One-Time Use Data (Expire immediately)
db.otp_tokens.createIndex(
{ createdAt: 1 },
{ expireAfterSeconds: 0 } // Expire immediately after insert time
);
// Better: Set custom expiration
db.otp_tokens.insertOne({
_id: generateUUID(),
userId: "alice",
code: "123456",
createdAt: new Date(),
expiresAt: new Date(Date.now() + 5 * 60 * 1000) // 5 minutes from now
});
// Use expiresAt field instead:
db.otp_tokens.createIndex(
{ expiresAt: 1 },
{ expireAfterSeconds: 0 }
);
TTL Behavior Details
// Background thread checks TTL indexes every 60 seconds
// Deletion is not instantaneous — allow ~1 minute delay
// Delayed example:
db.sessions.insertOne({
userId: "alice",
createdAt: new Date(Date.now() - 3700000) // 3700 seconds ago (> 3600)
});
print(db.sessions.countDocuments({})); // Still exists initially
// Wait ~1 minute...
print(db.sessions.countDocuments({})); // Now deleted
// Notes:
// - Deletion happens on secondary replicas at same time as primary
// - TTL index on secondary doesn't delete until primary deletes
// - Sharded clusters: each shard runs TTL cleanup independently
Capped Collections
Fixed-size collections that automatically remove oldest documents when full.
// Create a capped collection of 10MB max size
db.createCollection(
"logs",
{ capped: true, size: 10485760 } // 10MB
);
// Or capped by document count
db.createCollection(
"events",
{ capped: true, max: 1000 } // Max 1000 documents
);
// Inserts work normally
db.logs.insertMany([
{ level: "info", message: "Application started" },
{ level: "error", message: "Connection timeout" }
]);
// When collection reaches 10MB, oldest documents are automatically removed
// FIFO (First-In-First-Out) behavior
// Queries on capped collections return insertion order by default
db.logs.find().sort({ $natural: 1 }); // Oldest first
db.logs.find().sort({ $natural: -1 }); // Newest first (reverse)
Capped vs. TTL
| Feature | Capped Collection | TTL Index |
|---|---|---|
| Removal Trigger | Size limit reached | Time elapsed |
| Order | FIFO (oldest first) | Time-based |
| Best For | Fixed-size logs, activity feeds | Sessions, transient data, cache |
| Overhead | Lower (simple deletion) | Higher (background check) |
| Flexibility | Less (all docs same age) | More (selective based on time) |
| Practical Use | System logs, event streams | Auth tokens, temporary data |
Change Streams
Change Streams allow applications to listen to real-time data changes in a collection without polling.
Basic Change Stream
// Watch entire collection for changes
const changeStream = db.users.watch();
// Listen for changes
changeStream.on("change", change => {
console.log("Change detected:", change);
// {
// _id: { resumeToken: "..." },
// operationType: "insert" | "update" | "replace" | "delete" | "invalidate",
// ns: { db: "mydb", coll: "users" },
// documentKey: { _id: ObjectId("...") },
// fullDocument: { _id: ObjectId("..."), name: "Alice", ... }, // For insert/replace
// updateDescription: { updatedFields: {...}, removedFields: [...] } // For update
// }
});
// When another client updates a user:
db.users.updateOne(
{ _id: ObjectId("...") },
{ $set: { status: "active" } }
);
// Change stream prints:
// Change detected: {
// operationType: "update",
// documentKey: { _id: ObjectId("...") },
// updateDescription: { updatedFields: { status: "active" }, removedFields: [] }
// }
Filtered Change Stream
Watch for specific changes only:
// Watch changes where operationType is "insert" or "update"
const changeStream = db.orders.watch([
{
$match: {
operationType: { $in: ["insert", "update"] },
"fullDocument.status": "completed"
}
}
]);
changeStream.on("change", change => {
console.log("Order completed:", change.fullDocument);
});
Change Stream Resume Tokens
Resume a stream from where it left off (after reconnection):
// Store resume token
let resumeToken = null;
const changeStream = db.users.watch();
changeStream.on("change", change => {
console.log("Change:", change);
resumeToken = change._id; // Save resume token
});
changeStream.on("error", err => {
console.log("Stream error:", err);
// Reconnect with resume token
if (resumeToken) {
const newStream = db.users.watch([], { resumeAfter: resumeToken });
console.log("Resumed from", resumeToken);
// Continue listening from last position
}
});
Change Streams in Multi-Document Transactions
// Change stream sees all changes from a transaction together
const changeStream = db.accounts.watch();
changeStream.on("change", change => {
// Receives multiple changes as single transaction event
if (change.txnNumber) {
console.log("Transaction ID:", change.txnNumber);
console.log("Changes in this transaction:", change);
}
});
// In another session:
session = db.getMongo().startSession();
session.startTransaction();
db.accounts.updateOne(
{ _id: "alice" },
{ $inc: { balance: -100 } },
{ session }
);
db.accounts.updateOne(
{ _id: "bob" },
{ $inc: { balance: 100 } },
{ session }
);
session.commitTransaction();
// Change stream receives both updates with same txnNumber
Practical Change Stream Examples
Real-Time Notifications
// Send notification whenever order status changes
const changeStream = db.orders.watch([
{
$match: {
operationType: "update",
"updateDescription.updatedFields.status": { $exists: true }
}
}
]);
changeStream.on("change", change => {
const orderId = change.documentKey._id;
const newStatus = change.updateDescription.updatedFields.status;
const userId = change.fullDocument.userId;
// Send notification
notify.send(userId, `Your order ${orderId} is now ${newStatus}`);
});
Cache Invalidation
// Invalidate cache whenever product changes
const changeStream = db.products.watch();
changeStream.on("change", change => {
const productId = change.documentKey._id;
// Remove from cache
cache.delete(`product:${productId}`);
// Publish to message queue for other services
messageQueue.publish("product-changed", { productId });
});
Activity Feed
// Stream user activity in real-time
const changeStream = db.user_activities.watch([
{ $match: { operationType: "insert" } }
]);
changeStream.on("change", change => {
const activity = change.fullDocument;
// Add to activity feed in real-time
emitToWebSocket(`user-${activity.userId}:activity`, activity);
});
Summary
TTL Indexes: - Automatically delete documents after specified seconds - Perfect for sessions, temporary data, cache, logs - Background cleanup runs every 60 seconds - Simple and requires no application logic
Capped Collections: - Fixed size, automatic FIFO deletion when full - Good for logs and event streams with predictable size - Simpler than TTL, but less flexible
Change Streams: - Real-time listening to data changes - Watch collections or filtered changes - Resume from saved position using resume tokens - Perfect for notifications, cache invalidation, activity feeds - Works with transactions — sees all changes together
Best Practices: - Use TTL for time-based expiration - Use Change Streams for real-time applications - Store resume tokens for fault tolerance - Filter change streams for specific event types - Combine TTL with other features for complete data lifecycle management