🧪 Lab 06: TTL & Capped Collections¶
Topics: TTL indexes, per-doc expiry, capped collections, change streams
In [ ]:
Copied!
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
TTL Index — Auto-delete after N seconds¶
In [ ]:
Copied!
db.sessions.drop()
db.sessions.create_index([("createdAt",1)],expireAfterSeconds=30,name="idx_sessions_ttl")
db.sessions.insert_many([{"sessionId":"sess-001","userId":"alice","data":{"page":"/home"},"createdAt":datetime.now(timezone.utc)},{"sessionId":"sess-002","userId":"bob","data":{"page":"/catalog"},"createdAt":datetime.now(timezone.utc)},{"sessionId":"sess-old","userId":"charlie","data":{},"createdAt":datetime.now(timezone.utc)-timedelta(minutes=2)}])
print("Inserted 3 sessions (sess-old will auto-delete in ~60s)")
print("Count:", db.sessions.count_documents({}))
db.sessions.drop()
db.sessions.create_index([("createdAt",1)],expireAfterSeconds=30,name="idx_sessions_ttl")
db.sessions.insert_many([{"sessionId":"sess-001","userId":"alice","data":{"page":"/home"},"createdAt":datetime.now(timezone.utc)},{"sessionId":"sess-002","userId":"bob","data":{"page":"/catalog"},"createdAt":datetime.now(timezone.utc)},{"sessionId":"sess-old","userId":"charlie","data":{},"createdAt":datetime.now(timezone.utc)-timedelta(minutes=2)}])
print("Inserted 3 sessions (sess-old will auto-delete in ~60s)")
print("Count:", db.sessions.count_documents({}))
Modify TTL with collMod¶
In [ ]:
Copied!
db.command({"collMod":"sessions","index":{"name":"idx_sessions_ttl","expireAfterSeconds":3600}})
print("Extended TTL to 1 hour")
db.command({"collMod":"sessions","index":{"name":"idx_sessions_ttl","expireAfterSeconds":3600}})
print("Extended TTL to 1 hour")
Per-Document Expiry Pattern¶
In [ ]:
Copied!
db.tokens.drop()
db.tokens.create_index([("expiresAt",1)],expireAfterSeconds=0,name="idx_tokens_expire")
db.tokens.insert_many([{"token":"reset-abc","userId":"alice","type":"password_reset","expiresAt":datetime.now(timezone.utc)+timedelta(minutes=15)},{"token":"otp-xyz","userId":"bob","type":"otp","expiresAt":datetime.now(timezone.utc)+timedelta(minutes=5)}])
for t in db.tokens.find({},{"_id":0,"token":1,"type":1,"expiresAt":1}):
print(f" {t['token']} ({t['type']}) expires: {t['expiresAt']}")
db.tokens.drop()
db.tokens.create_index([("expiresAt",1)],expireAfterSeconds=0,name="idx_tokens_expire")
db.tokens.insert_many([{"token":"reset-abc","userId":"alice","type":"password_reset","expiresAt":datetime.now(timezone.utc)+timedelta(minutes=15)},{"token":"otp-xyz","userId":"bob","type":"otp","expiresAt":datetime.now(timezone.utc)+timedelta(minutes=5)}])
for t in db.tokens.find({},{"_id":0,"token":1,"type":1,"expiresAt":1}):
print(f" {t['token']} ({t['type']}) expires: {t['expiresAt']}")
Capped Collections — Fixed-size ring buffer¶
In [ ]:
Copied!
if "activity_log" in db.list_collection_names():
db.activity_log.drop()
db.create_collection("activity_log",capped=True,size=10240,max=10)
for i in range(1,16):
db.activity_log.insert_one({"seq":i,"action":f"event-{i}","ts":datetime.now(timezone.utc)})
count = db.activity_log.count_documents({})
print(f"Inserted 15, retained: {count} (capped at 10)")
for doc in db.activity_log.find({},{"_id":0,"seq":1,"action":1}):
print(f" seq={doc['seq']}: {doc['action']}")
if "activity_log" in db.list_collection_names():
db.activity_log.drop()
db.create_collection("activity_log",capped=True,size=10240,max=10)
for i in range(1,16):
db.activity_log.insert_one({"seq":i,"action":f"event-{i}","ts":datetime.now(timezone.utc)})
count = db.activity_log.count_documents({})
print(f"Inserted 15, retained: {count} (capped at 10)")
for doc in db.activity_log.find({},{"_id":0,"seq":1,"action":1}):
print(f" seq={doc['seq']}: {doc['action']}")
Capped Stats¶
In [ ]:
Copied!
stats = db.command("collStats","activity_log")
print(f"Capped: {stats.get('capped')}, Max docs: {stats.get('max')}, Max size: {stats.get('maxSize')} bytes")
stats = db.command("collStats","activity_log")
print(f"Capped: {stats.get('capped')}, Max docs: {stats.get('max')}, Max size: {stats.get('maxSize')} bytes")
Change Streams (concept)¶
In [ ]:
Copied!
print("Opening change stream on orders...")
with db.orders.watch([{"$match":{"operationType":{"$in":["insert","update"]}}}],max_await_time_ms=3000) as stream:
db.orders.insert_one({"userName":"StreamTest","status":"pending","total":1.0,"orderedAt":datetime.now(timezone.utc)})
change = stream.try_next()
if change:
print(f"✅ Change event: {change['operationType']} on {change.get('fullDocument',{}).get('userName')}")
else:
print("(No change event in time window)")
print("Opening change stream on orders...")
with db.orders.watch([{"$match":{"operationType":{"$in":["insert","update"]}}}],max_await_time_ms=3000) as stream:
db.orders.insert_one({"userName":"StreamTest","status":"pending","total":1.0,"orderedAt":datetime.now(timezone.utc)})
change = stream.try_next()
if change:
print(f"✅ Change event: {change['operationType']} on {change.get('fullDocument',{}).get('userName')}")
else:
print("(No change event in time window)")