🧪 Lab 04: Aggregation Pipeline¶
Topics: $match, $group, $project, $unwind, $sort, $limit, $skip, $count
In [ ]:
Copied!
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
Setup: Seed orders¶
In [ ]:
Copied!
db.orders.drop()
db.orders.insert_many([
{"userName":"Alice","status":"delivered","total":120.0,"orderedAt":datetime(2024,4,5),"items":[{"name":"Keyboard","price":89.99,"qty":1}]},
{"userName":"Alice","status":"pending","total":45.0,"orderedAt":datetime(2024,4,12),"items":[{"name":"USB Hub","price":22.99,"qty":2}]},
{"userName":"Bob","status":"delivered","total":349.99,"orderedAt":datetime(2024,4,10),"items":[{"name":"Monitor","price":349.99,"qty":1}]},
{"userName":"Bob","status":"shipped","total":89.99,"orderedAt":datetime(2024,4,20),"items":[{"name":"Webcam","price":89.99,"qty":1}]},
{"userName":"Carol","status":"delivered","total":55.0,"orderedAt":datetime(2024,4,15),"items":[{"name":"Mousepad","price":27.5,"qty":2}]},
])
print("Orders seeded")
db.orders.drop()
db.orders.insert_many([
{"userName":"Alice","status":"delivered","total":120.0,"orderedAt":datetime(2024,4,5),"items":[{"name":"Keyboard","price":89.99,"qty":1}]},
{"userName":"Alice","status":"pending","total":45.0,"orderedAt":datetime(2024,4,12),"items":[{"name":"USB Hub","price":22.99,"qty":2}]},
{"userName":"Bob","status":"delivered","total":349.99,"orderedAt":datetime(2024,4,10),"items":[{"name":"Monitor","price":349.99,"qty":1}]},
{"userName":"Bob","status":"shipped","total":89.99,"orderedAt":datetime(2024,4,20),"items":[{"name":"Webcam","price":89.99,"qty":1}]},
{"userName":"Carol","status":"delivered","total":55.0,"orderedAt":datetime(2024,4,15),"items":[{"name":"Mousepad","price":27.5,"qty":2}]},
])
print("Orders seeded")
$match — Filter¶
In [ ]:
Copied!
pipeline = [{"$match":{"status":"delivered"}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: ${o['total']}")
pipeline = [{"$match":{"status":"delivered"}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: ${o['total']}")
$group — Aggregate¶
In [ ]:
Copied!
pipeline = [{"$group":{"_id":"$status","totalRevenue":{"$sum":"$total"},"count":{"$sum":1},"avg":{"$avg":"$total"}}},{"$sort":{"totalRevenue":-1}}]
for r in db.orders.aggregate(pipeline):
print(f" {r['_id']}: count={r['count']}, revenue=${r['totalRevenue']:.2f}")
pipeline = [{"$group":{"_id":"$status","totalRevenue":{"$sum":"$total"},"count":{"$sum":1},"avg":{"$avg":"$total"}}},{"$sort":{"totalRevenue":-1}}]
for r in db.orders.aggregate(pipeline):
print(f" {r['_id']}: count={r['count']}, revenue=${r['totalRevenue']:.2f}")
$project — Reshape¶
In [ ]:
Copied!
pipeline = [{"$match":{"status":{"$in":["delivered","shipped"]}}},{"$project":{"_id":0,"customer":"$userName","total":1,"tax":{"$multiply":["$total",0.1]}}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['customer']}: ${o['total']:.2f} + tax ${o['tax']:.2f}")
pipeline = [{"$match":{"status":{"$in":["delivered","shipped"]}}},{"$project":{"_id":0,"customer":"$userName","total":1,"tax":{"$multiply":["$total",0.1]}}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['customer']}: ${o['total']:.2f} + tax ${o['tax']:.2f}")
$addFields — Add without removing¶
In [ ]:
Copied!
pipeline = [{"$match":{"total":{"$gt":50}}},{"$addFields":{"discounted":{"$multiply":["$total",0.9]}}},{"$project":{"_id":0,"userName":1,"original":"$total","discounted":1}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: ${o['original']:.2f} → ${o['discounted']:.2f}")
pipeline = [{"$match":{"total":{"$gt":50}}},{"$addFields":{"discounted":{"$multiply":["$total",0.9]}}},{"$project":{"_id":0,"userName":1,"original":"$total","discounted":1}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: ${o['original']:.2f} → ${o['discounted']:.2f}")
$unwind — Flatten arrays¶
In [ ]:
Copied!
pipeline = [{"$unwind":"$items"},{"$project":{"_id":0,"userName":1,"item":"$items.name","price":"$items.price","qty":"$items qty"}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: {o['item']} x{o['qty']} @ ${o['price']}")
pipeline = [{"$unwind":"$items"},{"$project":{"_id":0,"userName":1,"item":"$items.name","price":"$items.price","qty":"$items qty"}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: {o['item']} x{o['qty']} @ ${o['price']}")
$unwind + $group — Aggregate across items¶
In [ ]:
Copied!
pipeline = [{"$unwind":"$items"},{"$group":{"_id":"$items.name","totalQty":{"$sum":"$items qty"},"totalRevenue":{"$sum":{"$multiply":["$items.price","$items qty"]}}}},{"$sort":{"totalRevenue":-1}}]
for r in db.orders.aggregate(pipeline):
print(f" {r['_id']}: qty={r['totalQty']}, revenue=${r['totalRevenue']:.2f}")
pipeline = [{"$unwind":"$items"},{"$group":{"_id":"$items.name","totalQty":{"$sum":"$items qty"},"totalRevenue":{"$sum":{"$multiply":["$items.price","$items qty"]}}}},{"$sort":{"totalRevenue":-1}}]
for r in db.orders.aggregate(pipeline):
print(f" {r['_id']}: qty={r['totalQty']}, revenue=${r['totalRevenue']:.2f}")
$sort + $limit + $skip — Pagination¶
In [ ]:
Copied!
PAGE, SIZE = 0, 2
pipeline = [{"$sort":{"orderedAt":-1}},{"$skip":PAGE*SIZE},{"$limit":SIZE},{"$project":{"_id":0,"userName":1,"total":1,"status":1}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: ${o['total']}")
PAGE, SIZE = 0, 2
pipeline = [{"$sort":{"orderedAt":-1}},{"$skip":PAGE*SIZE},{"$limit":SIZE},{"$project":{"_id":0,"userName":1,"total":1,"status":1}}]
for o in db.orders.aggregate(pipeline):
print(f" {o['userName']}: ${o['total']}")
$count¶
In [ ]:
Copied!
result = list(db.orders.aggregate([{"$match":{"total":{"$gt":100}}},{"$count":"ordersOver100"}]))
print("Orders > $100:", result[0]["ordersOver100"] if result else 0)
result = list(db.orders.aggregate([{"$match":{"total":{"$gt":100}}},{"$count":"ordersOver100"}]))
print("Orders > $100:", result[0]["ordersOver100"] if result else 0)
$sortByCount¶
In [ ]:
Copied!
for r in db.orders.aggregate([{"$sortByCount":"$userName"}]):
print(f" {r['_id']}: {r['count']} orders")
for r in db.orders.aggregate([{"$sortByCount":"$userName"}]):
print(f" {r['_id']}: {r['count']} orders")