In [ ]:
Copied!
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.errors import ConnectionFailure
from bson import ObjectId
from datetime import datetime, timedelta, timezone
import pprint
USE_ATLAS = False
ATLAS_URI = "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
DOCKER_URI = "mongodb://127.0.0.1:27017/?directConnection=true"
uri = ATLAS_URI if USE_ATLAS else DOCKER_URI
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
try:
client.admin.command("ping")
print("✅ Connected to", "Atlas" if USE_ATLAS else "Docker MongoDB")
except ConnectionFailure as e:
print("❌ Connection failed:", e)
db = client["mongo_labs"]
pp = pprint.PrettyPrinter(indent=2)
Setup: Accounts¶
In [ ]:
Copied!
db.accounts.drop()
db.transfers.drop()
db.accounts.insert_many([{"_id":"acct-alice","owner":"Alice","balance":1000.0},{"_id":"acct-bob","owner":"Bob","balance":200.0}])
print("Accounts: Alice=$1000, Bob=$200")
db.accounts.drop()
db.transfers.drop()
db.accounts.insert_many([{"_id":"acct-alice","owner":"Alice","balance":1000.0},{"_id":"acct-bob","owner":"Bob","balance":200.0}])
print("Accounts: Alice=$1000, Bob=$200")
Fund Transfer Transaction¶
In [ ]:
Copied!
AMOUNT = 150.0
with client.start_session() as session:
try:
with session.start_transaction(read_concern={"level":"snapshot"},write_concern={"w":"majority"}):
accounts = db.get_collection("accounts", session=session)
result = db.accounts.update_one({"_id":"acct-alice","balance":{"$gte":AMOUNT}},{"$inc":{"balance":-AMOUNT}},session=session)
if result.modified_count == 0:
raise ValueError("Insufficient funds")
db.accounts.update_one({"_id":"acct-bob"},{"$inc":{"balance":AMOUNT}},session=session)
db.transfers.insert_one({"from":"acct-alice","to":"acct-bob","amount":AMOUNT,"ts":datetime.now(timezone.utc)},session=session)
print(f"✅ Transfer ${AMOUNT} committed")
except Exception as e:
print(f"❌ Aborted: {e}")
alice = db.accounts.find_one({"_id":"acct-alice"})
bob = db.accounts.find_one({"_id":"acct-bob"})
print(f"After: Alice=${alice['balance']}, Bob=${bob['balance']}")
AMOUNT = 150.0
with client.start_session() as session:
try:
with session.start_transaction(read_concern={"level":"snapshot"},write_concern={"w":"majority"}):
accounts = db.get_collection("accounts", session=session)
result = db.accounts.update_one({"_id":"acct-alice","balance":{"$gte":AMOUNT}},{"$inc":{"balance":-AMOUNT}},session=session)
if result.modified_count == 0:
raise ValueError("Insufficient funds")
db.accounts.update_one({"_id":"acct-bob"},{"$inc":{"balance":AMOUNT}},session=session)
db.transfers.insert_one({"from":"acct-alice","to":"acct-bob","amount":AMOUNT,"ts":datetime.now(timezone.utc)},session=session)
print(f"✅ Transfer ${AMOUNT} committed")
except Exception as e:
print(f"❌ Aborted: {e}")
alice = db.accounts.find_one({"_id":"acct-alice"})
bob = db.accounts.find_one({"_id":"acct-bob"})
print(f"After: Alice=${alice['balance']}, Bob=${bob['balance']}")
Abort Demo — Insufficient Funds¶
In [ ]:
Copied!
with client.start_session() as session:
try:
with session.start_transaction(write_concern={"w":"majority"}):
result = db.accounts.update_one({"_id":"acct-alice","balance":{"$gte":5000}},{"$inc":{"balance":-5000}},session=session)
if result.modified_count == 0:
raise ValueError("Insufficient funds")
db.accounts.update_one({"_id":"acct-bob"},{"$inc":{"balance":5000}},session=session)
except Exception as e:
print(f"❌ Aborted: {e}")
alice = db.accounts.find_one({"_id":"acct-alice"})
bob = db.accounts.find_one({"_id":"acct-bob"})
print(f"After abort: Alice=${alice['balance']}, Bob=${bob['balance']} (unchanged)")
with client.start_session() as session:
try:
with session.start_transaction(write_concern={"w":"majority"}):
result = db.accounts.update_one({"_id":"acct-alice","balance":{"$gte":5000}},{"$inc":{"balance":-5000}},session=session)
if result.modified_count == 0:
raise ValueError("Insufficient funds")
db.accounts.update_one({"_id":"acct-bob"},{"$inc":{"balance":5000}},session=session)
except Exception as e:
print(f"❌ Aborted: {e}")
alice = db.accounts.find_one({"_id":"acct-alice"})
bob = db.accounts.find_one({"_id":"acct-bob"})
print(f"After abort: Alice=${alice['balance']}, Bob=${bob['balance']} (unchanged)")
Retry Pattern with with_transaction()¶
In [ ]:
Copied!
def transfer_callback(session):
db.accounts.update_one({"_id":"acct-alice"},{"$inc":{"balance":25}},session=session)
db.accounts.update_one({"_id":"acct-bob"},{"$inc":{"balance":-25}},session=session)
with client.start_session() as session:
session.with_transaction(transfer_callback, write_concern={"w":"majority"})
print("✅ Retry-safe transfer committed")
alice = db.accounts.find_one({"_id":"acct-alice"})
print(f"Final: Alice=${alice['balance']}")
def transfer_callback(session):
db.accounts.update_one({"_id":"acct-alice"},{"$inc":{"balance":25}},session=session)
db.accounts.update_one({"_id":"acct-bob"},{"$inc":{"balance":-25}},session=session)
with client.start_session() as session:
session.with_transaction(transfer_callback, write_concern={"w":"majority"})
print("✅ Retry-safe transfer committed")
alice = db.accounts.find_one({"_id":"acct-alice"})
print(f"Final: Alice=${alice['balance']}")