In [ ]:
Copied!
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
Setup¶
In [ ]:
Copied!
db.users.drop()
db.orders.drop()
alice_id = db.users.insert_one({"name":"Alice Johnson","email":"alice@example.com"}).inserted_id
bob_id = db.users.insert_one({"name":"Bob Smith","email":"bob@example.com"}).inserted_id
db.orders.insert_many([{"userId":alice_id,"status":"delivered","total":120.0,"orderedAt":datetime(2024,4,5)},{"userId":alice_id,"status":"pending","total":45.0,"orderedAt":datetime(2024,4,12)},{"userId":bob_id,"status":"delivered","total":349.99,"orderedAt":datetime(2024,4,10)}])
print("Users + orders seeded")
db.users.drop()
db.orders.drop()
alice_id = db.users.insert_one({"name":"Alice Johnson","email":"alice@example.com"}).inserted_id
bob_id = db.users.insert_one({"name":"Bob Smith","email":"bob@example.com"}).inserted_id
db.orders.insert_many([{"userId":alice_id,"status":"delivered","total":120.0,"orderedAt":datetime(2024,4,5)},{"userId":alice_id,"status":"pending","total":45.0,"orderedAt":datetime(2024,4,12)},{"userId":bob_id,"status":"delivered","total":349.99,"orderedAt":datetime(2024,4,10)}])
print("Users + orders seeded")
$lookup — Join collections¶
In [ ]:
Copied!
pipeline = [{"$match":{"status":"delivered"}},{"$lookup":{"from":"users","localField":"userId","foreignField":"_id","as":"userDetails"}},{"$unwind":{"path":"$userDetails","preserveNullAndEmpty":True}},{"$project":{"_id":0,"customer":"$userDetails.name","email":"$userDetails.email","total":1,"status":1}}]
for r in db.orders.aggregate(pipeline):
print(f" {r['customer']} ({r['email']}): ${r['total']}")
pipeline = [{"$match":{"status":"delivered"}},{"$lookup":{"from":"users","localField":"userId","foreignField":"_id","as":"userDetails"}},{"$unwind":{"path":"$userDetails","preserveNullAndEmpty":True}},{"$project":{"_id":0,"customer":"$userDetails.name","email":"$userDetails.email","total":1,"status":1}}]
for r in db.orders.aggregate(pipeline):
print(f" {r['customer']} ({r['email']}): ${r['total']}")
$lookup with pipeline¶
In [ ]:
Copied!
pipeline = [{"$match":{"name":"Alice Johnson"}},{"$lookup":{"from":"orders","let":{"uid":"$_id"},"pipeline":[{"$match":{"$expr":{"$and":[{"$eq":["$userId","$$uid"]},{"$ne":["$status","delivered"]}]}}},{"$project":{"_id":1,"status":1,"total":1}}],"as":"activeOrders"}},{"$project":{"_id":0,"name":1,"activeOrders":1}}]
for r in db.users.aggregate(pipeline):
print(f"{r['name']} active orders: {r['activeOrders']}")
pipeline = [{"$match":{"name":"Alice Johnson"}},{"$lookup":{"from":"orders","let":{"uid":"$_id"},"pipeline":[{"$match":{"$expr":{"$and":[{"$eq":["$userId","$$uid"]},{"$ne":["$status","delivered"]}]}}},{"$project":{"_id":1,"status":1,"total":1}}],"as":"activeOrders"}},{"$project":{"_id":0,"name":1,"activeOrders":1}}]
for r in db.users.aggregate(pipeline):
print(f"{r['name']} active orders: {r['activeOrders']}")
$facet — Multi-faceted aggregation¶
In [ ]:
Copied!
db.products.drop()
db.products.insert_many([{"name":"Keyboard","category":"peripherals","price":89.99},{"name":"USB Hub","category":"peripherals","price":22.99},{"name":"Monitor","category":"monitors","price":349.99},{"name":"Stand","category":"accessories","price":35.99},{"name":"Desk","category":"furniture","price":499.0}])
pipeline = [{"$facet":{"stats":[{"$group":{"_id":None,"total":{"$sum":1},"avg":{"$avg":"$price"},"min":{"$min":"$price"},"max":{"$max":"$price"}}}],"byCategory":[{"$group":{"_id":"$category","count":{"$sum":1}}},{"$sort":{"count":-1}}]}}]
result = list(db.products.aggregate(pipeline))[0]
print("Stats:", result["stats"])
print("By category:", result["byCategory"])
db.products.drop()
db.products.insert_many([{"name":"Keyboard","category":"peripherals","price":89.99},{"name":"USB Hub","category":"peripherals","price":22.99},{"name":"Monitor","category":"monitors","price":349.99},{"name":"Stand","category":"accessories","price":35.99},{"name":"Desk","category":"furniture","price":499.0}])
pipeline = [{"$facet":{"stats":[{"$group":{"_id":None,"total":{"$sum":1},"avg":{"$avg":"$price"},"min":{"$min":"$price"},"max":{"$max":"$price"}}}],"byCategory":[{"$group":{"_id":"$category","count":{"$sum":1}}},{"$sort":{"count":-1}}]}}]
result = list(db.products.aggregate(pipeline))[0]
print("Stats:", result["stats"])
print("By category:", result["byCategory"])
$bucket — Price ranges¶
In [ ]:
Copied!
pipeline = [{"$bucket":{"groupBy":"$price","boundaries":[0,50,100,500],"default":"Other","output":{"count":{"$sum":1},"names":{"$push":"$name"}}}}]
for b in db.products.aggregate(pipeline):
print(f" ${b['_id']}+: count={b['count']}, products={b['names']}")
pipeline = [{"$bucket":{"groupBy":"$price","boundaries":[0,50,100,500],"default":"Other","output":{"count":{"$sum":1},"names":{"$push":"$name"}}}}]
for b in db.products.aggregate(pipeline):
print(f" ${b['_id']}+: count={b['count']}, products={b['names']}")
$bucketAuto¶
In [ ]:
Copied!
pipeline = [{"$bucketAuto":{"groupBy":"$price","buckets":3}}]
for b in db.products.aggregate(pipeline):
print(f" ${b['_id']['min']:.2f}–${b['_id']['max']:.2f}: {b['count']} products")
pipeline = [{"$bucketAuto":{"groupBy":"$price","buckets":3}}]
for b in db.products.aggregate(pipeline):
print(f" ${b['_id']['min']:.2f}–${b['_id']['max']:.2f}: {b['count']} products")
$graphLookup — Recursive traversal¶
In [ ]:
Copied!
db.employees.drop()
db.employees.insert_many([{"_id":1,"name":"CEO","reportsTo":None},{"_id":2,"name":"VP Eng","reportsTo":"CEO"},{"_id":3,"name":"Sr Engineer","reportsTo":"VP Eng"},{"_id":4,"name":"Engineer","reportsTo":"Sr Engineer"}])
pipeline = [{"$match":{"name":"Engineer"}},{"$graphLookup":{"from":"employees","startWith":"$reportsTo","connectFromField":"reportsTo","connectToField":"name","as":"chain","maxDepth":5,"depthField":"depth"}},{"$project":{"name":1,"chain":1}}]
result = list(db.employees.aggregate(pipeline))[0]
for m in sorted(result["chain"], key=lambda x:x["depth"]):
print(f" depth {m['depth']}: {m['name']}")
db.employees.drop()
db.employees.insert_many([{"_id":1,"name":"CEO","reportsTo":None},{"_id":2,"name":"VP Eng","reportsTo":"CEO"},{"_id":3,"name":"Sr Engineer","reportsTo":"VP Eng"},{"_id":4,"name":"Engineer","reportsTo":"Sr Engineer"}])
pipeline = [{"$match":{"name":"Engineer"}},{"$graphLookup":{"from":"employees","startWith":"$reportsTo","connectFromField":"reportsTo","connectToField":"name","as":"chain","maxDepth":5,"depthField":"depth"}},{"$project":{"name":1,"chain":1}}]
result = list(db.employees.aggregate(pipeline))[0]
for m in sorted(result["chain"], key=lambda x:x["depth"]):
print(f" depth {m['depth']}: {m['name']}")