In [ ]:
Copied!
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
Replica Set Status¶
In [ ]:
Copied!
admin_db = client["admin"]
status = admin_db.command("replSetGetStatus")
for m in status["members"]:
print(f" {m['name']:25s} {m['stateStr']:10s} health={m['health']}")
admin_db = client["admin"]
status = admin_db.command("replSetGetStatus")
for m in status["members"]:
print(f" {m['name']:25s} {m['stateStr']:10s} health={m['health']}")
Write Concerns¶
In [ ]:
Copied!
# w=majority: survives primary failure
db.orders.insert_one({"test":"w-majority"},write_concern={"w":"majority","j":True})
print("Inserted with w=majority + journal")
# w=1: primary-only (faster, less durable)
db.events.insert_one({"test":"w-1"},write_concern={"w":1})
print("Inserted with w=1 (primary-only)")
# w=majority: survives primary failure
db.orders.insert_one({"test":"w-majority"},write_concern={"w":"majority","j":True})
print("Inserted with w=majority + journal")
# w=1: primary-only (faster, less durable)
db.events.insert_one({"test":"w-1"},write_concern={"w":1})
print("Inserted with w=1 (primary-only)")
Read Preferences¶
In [ ]:
Copied!
from pymongo import ReadPreference
# Primary: consisent, slower
primary_coll = db.get_collection("orders",read_preference=ReadPreference.PRIMARY)
print("Read from PRIMARY:", primary_coll.count_documents({}))
# SecondaryPreferred: may be stale, faster
sec_coll = db.get_collection("orders",read_preference=ReadPreference.SECONDARY_PREFERRED)
print("Read from SECONDARY_PREFERRED:", sec_coll.count_documents({}))
from pymongo import ReadPreference
# Primary: consisent, slower
primary_coll = db.get_collection("orders",read_preference=ReadPreference.PRIMARY)
print("Read from PRIMARY:", primary_coll.count_documents({}))
# SecondaryPreferred: may be stale, faster
sec_coll = db.get_collection("orders",read_preference=ReadPreference.SECONDARY_PREFERRED)
print("Read from SECONDARY_PREFERRED:", sec_coll.count_documents({}))
Read Concerns¶
In [ ]:
Copied!
from pymongo.read_concern import ReadConcern
majority_coll = db.get_collection("orders",read_concern=ReadConcern("majority"))
for o in majority_coll.find({},{"_id":0,"test":1}).limit(3):
print(f" {o}")
from pymongo.read_concern import ReadConcern
majority_coll = db.get_collection("orders",read_concern=ReadConcern("majority"))
for o in majority_coll.find({},{"_id":0,"test":1}).limit(3):
print(f" {o}")
Replication Lag¶
In [ ]:
Copied!
rs_status = admin_db.command("replSetGetStatus")
for m in rs_status["members"]:
if m["stateStr"] != "PRIMARY":
optime = m.get("optimeDate")
if optime:
lag = (datetime.now(timezone.utc) - optime).total_seconds()
print(f" {m['name']}: lag ~{lag:.1f}s")
else:
print(f" {m['name']}: lag N/A")
rs_status = admin_db.command("replSetGetStatus")
for m in rs_status["members"]:
if m["stateStr"] != "PRIMARY":
optime = m.get("optimeDate")
if optime:
lag = (datetime.now(timezone.utc) - optime).total_seconds()
print(f" {m['name']}: lag ~{lag:.1f}s")
else:
print(f" {m['name']}: lag N/A")