Lab 4: Real Data Ingestion with MongoDB & Vector DB¶
Level: Intermediate | Duration: 2.5 hours
Objective¶
Build a complete ingestion pipeline from structured data to searchable vector chunks.
What You'll Learn¶
- Load data from local JSON or MongoDB Atlas
- Normalize records into retrieval-ready text
- Apply chunking with overlap
- Batch-generate embeddings efficiently
- Store vectors with metadata in MongoDB Atlas
- Run Atlas
$vectorSearchqueries and inspect results - Measure ingestion throughput
Data Loading Options¶
- Local JSON (
../data/restaurants_sample.json) - easiest for first run - MongoDB Atlas (free tier) - production-like workflow
- Custom records (same normalization function applies)
1) Imports and Configuration¶
In [96]:
Copied!
from pathlib import Path
from typing import Dict, List, Any
import json
import os
import time
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
# Optional Atlas import (only required if using SOURCE_MODE="atlas")
try:
from pymongo import MongoClient
except Exception:
MongoClient = None
# Load ATLAS_URI from the first .env found while walking up directories.
for _base in [Path.cwd(), *Path.cwd().parents]:
_candidate = _base / ".env"
if _candidate.exists():
load_dotenv(dotenv_path=_candidate, override=False)
break
# -------------------------------
# Configuration
# -------------------------------
SOURCE_MODE = "atlas" # "local" or "atlas"
MAX_RECORDS = 200 # keep small while learning; raise later
RUN_ATLAS_VECTOR_SEARCH = True # Set False to skip Atlas operations
# Local file path (relative to this notebook's folder)
LOCAL_JSON_PATH = Path("../data/restaurants_sample.json")
# Atlas settings (used only when SOURCE_MODE == "atlas")
ATLAS_URI = os.getenv("ATLAS_URI", "").strip()
ATLAS_DB_NAME = "sample_restaurants"
ATLAS_COLLECTION = "restaurants"
# Atlas vector target for this lab
ATLAS_VECTOR_DB = "rag_lab"
ATLAS_VECTOR_COLLECTION = "restaurant_chunks"
ATLAS_VECTOR_INDEX_NAME = "vector_index"
# Chunking
CHUNK_SIZE_CHARS = 280
CHUNK_OVERLAP_CHARS = 60
# Embedding model
MODEL_NAME = "all-MiniLM-L6-v2"
print("Configuration loaded")
print(f"SOURCE_MODE={SOURCE_MODE}, MAX_RECORDS={MAX_RECORDS}")
print(f"RUN_ATLAS_VECTOR_SEARCH={RUN_ATLAS_VECTOR_SEARCH}")
print(f"RAW source collection: {ATLAS_DB_NAME}.{ATLAS_COLLECTION}")
print(f"VECTOR collection: {ATLAS_VECTOR_DB}.{ATLAS_VECTOR_COLLECTION}")
from pathlib import Path
from typing import Dict, List, Any
import json
import os
import time
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
# Optional Atlas import (only required if using SOURCE_MODE="atlas")
try:
from pymongo import MongoClient
except Exception:
MongoClient = None
# Load ATLAS_URI from the first .env found while walking up directories.
for _base in [Path.cwd(), *Path.cwd().parents]:
_candidate = _base / ".env"
if _candidate.exists():
load_dotenv(dotenv_path=_candidate, override=False)
break
# -------------------------------
# Configuration
# -------------------------------
SOURCE_MODE = "atlas" # "local" or "atlas"
MAX_RECORDS = 200 # keep small while learning; raise later
RUN_ATLAS_VECTOR_SEARCH = True # Set False to skip Atlas operations
# Local file path (relative to this notebook's folder)
LOCAL_JSON_PATH = Path("../data/restaurants_sample.json")
# Atlas settings (used only when SOURCE_MODE == "atlas")
ATLAS_URI = os.getenv("ATLAS_URI", "").strip()
ATLAS_DB_NAME = "sample_restaurants"
ATLAS_COLLECTION = "restaurants"
# Atlas vector target for this lab
ATLAS_VECTOR_DB = "rag_lab"
ATLAS_VECTOR_COLLECTION = "restaurant_chunks"
ATLAS_VECTOR_INDEX_NAME = "vector_index"
# Chunking
CHUNK_SIZE_CHARS = 280
CHUNK_OVERLAP_CHARS = 60
# Embedding model
MODEL_NAME = "all-MiniLM-L6-v2"
print("Configuration loaded")
print(f"SOURCE_MODE={SOURCE_MODE}, MAX_RECORDS={MAX_RECORDS}")
print(f"RUN_ATLAS_VECTOR_SEARCH={RUN_ATLAS_VECTOR_SEARCH}")
print(f"RAW source collection: {ATLAS_DB_NAME}.{ATLAS_COLLECTION}")
print(f"VECTOR collection: {ATLAS_VECTOR_DB}.{ATLAS_VECTOR_COLLECTION}")
Configuration loaded SOURCE_MODE=atlas, MAX_RECORDS=200 RUN_ATLAS_VECTOR_SEARCH=True RAW source collection: sample_restaurants.restaurants VECTOR collection: rag_lab.restaurant_chunks
Data Model Note (Important)¶
This lab intentionally uses two Atlas collections:
sample_restaurants.restaurants-> raw source records (no embeddings)rag_lab.restaurant_chunks-> chunked records +embeddingvectors
$vectorSearch runs against rag_lab.restaurant_chunks only.
2) Helper Functions¶
In [97]:
Copied!
def load_local_json(path: Path) -> List[Dict[str, Any]]:
if not path.exists():
raise FileNotFoundError(f"Local JSON not found: {path.resolve()}")
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Expected top-level JSON array of records")
return data
def load_from_atlas(uri: str, db_name: str, coll_name: str, limit: int) -> List[Dict[str, Any]]:
if MongoClient is None:
raise ImportError("pymongo is not installed. Run: pip install pymongo")
if "<user>" in uri or "<password>" in uri or "<cluster>" in uri:
raise ValueError("Set a real ATLAS_URI before using SOURCE_MODE='atlas'")
client = MongoClient(uri, serverSelectionTimeoutMS=15000)
client.admin.command("ping")
cursor = client[db_name][coll_name].find({}, limit=limit)
records = list(cursor)
return records
def record_to_text(record: Dict[str, Any]) -> str:
"""Normalize restaurant-like records into a single retrievable text block."""
rid = str(record.get("id", record.get("_id", "unknown_id")))
name = str(record.get("name", "Unknown Restaurant"))
cuisine = str(record.get("cuisine", "Unknown Cuisine"))
address = str(record.get("address", "Unknown Address"))
rating = str(record.get("rating", "N/A"))
description = str(record.get("description", ""))
reviews = record.get("reviews", [])
if isinstance(reviews, list):
review_text = " | ".join(str(r) for r in reviews[:5])
else:
review_text = str(reviews)
text = (
f"Restaurant ID: {rid}. "
f"Name: {name}. "
f"Cuisine: {cuisine}. "
f"Address: {address}. "
f"Rating: {rating}. "
f"Description: {description}. "
f"Reviews: {review_text}."
)
return text
def chunk_text(text: str, chunk_size: int = 280, overlap: int = 60) -> List[str]:
"""Simple character chunking with overlap for long records."""
text = " ".join(text.split()) # normalize spaces
if len(text) <= chunk_size:
return [text]
chunks: List[str] = []
start = 0
step = max(1, chunk_size - overlap)
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
if end >= len(text):
break
start += step
return chunks
def build_chunk_documents(records: List[Dict[str, Any]], chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
chunk_docs: List[Dict[str, Any]] = []
for rec_idx, record in enumerate(records):
base_text = record_to_text(record)
chunks = chunk_text(base_text, chunk_size=chunk_size, overlap=overlap)
src_id = str(record.get("id", record.get("_id", f"record_{rec_idx}")))
for chunk_idx, chunk in enumerate(chunks):
chunk_docs.append(
{
"chunk_id": f"{src_id}_chunk_{chunk_idx}",
"source_id": src_id,
"text": chunk,
"metadata": {
"source_id": src_id,
"name": str(record.get("name", "Unknown Restaurant")),
"cuisine": str(record.get("cuisine", "Unknown Cuisine")),
"chunk_index": chunk_idx,
"num_chunks_for_source": len(chunks),
},
}
)
return chunk_docs
def load_local_json(path: Path) -> List[Dict[str, Any]]:
if not path.exists():
raise FileNotFoundError(f"Local JSON not found: {path.resolve()}")
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Expected top-level JSON array of records")
return data
def load_from_atlas(uri: str, db_name: str, coll_name: str, limit: int) -> List[Dict[str, Any]]:
if MongoClient is None:
raise ImportError("pymongo is not installed. Run: pip install pymongo")
if "<user>" in uri or "<password>" in uri or "<cluster>" in uri:
raise ValueError("Set a real ATLAS_URI before using SOURCE_MODE='atlas'")
client = MongoClient(uri, serverSelectionTimeoutMS=15000)
client.admin.command("ping")
cursor = client[db_name][coll_name].find({}, limit=limit)
records = list(cursor)
return records
def record_to_text(record: Dict[str, Any]) -> str:
"""Normalize restaurant-like records into a single retrievable text block."""
rid = str(record.get("id", record.get("_id", "unknown_id")))
name = str(record.get("name", "Unknown Restaurant"))
cuisine = str(record.get("cuisine", "Unknown Cuisine"))
address = str(record.get("address", "Unknown Address"))
rating = str(record.get("rating", "N/A"))
description = str(record.get("description", ""))
reviews = record.get("reviews", [])
if isinstance(reviews, list):
review_text = " | ".join(str(r) for r in reviews[:5])
else:
review_text = str(reviews)
text = (
f"Restaurant ID: {rid}. "
f"Name: {name}. "
f"Cuisine: {cuisine}. "
f"Address: {address}. "
f"Rating: {rating}. "
f"Description: {description}. "
f"Reviews: {review_text}."
)
return text
def chunk_text(text: str, chunk_size: int = 280, overlap: int = 60) -> List[str]:
"""Simple character chunking with overlap for long records."""
text = " ".join(text.split()) # normalize spaces
if len(text) <= chunk_size:
return [text]
chunks: List[str] = []
start = 0
step = max(1, chunk_size - overlap)
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
if end >= len(text):
break
start += step
return chunks
def build_chunk_documents(records: List[Dict[str, Any]], chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
chunk_docs: List[Dict[str, Any]] = []
for rec_idx, record in enumerate(records):
base_text = record_to_text(record)
chunks = chunk_text(base_text, chunk_size=chunk_size, overlap=overlap)
src_id = str(record.get("id", record.get("_id", f"record_{rec_idx}")))
for chunk_idx, chunk in enumerate(chunks):
chunk_docs.append(
{
"chunk_id": f"{src_id}_chunk_{chunk_idx}",
"source_id": src_id,
"text": chunk,
"metadata": {
"source_id": src_id,
"name": str(record.get("name", "Unknown Restaurant")),
"cuisine": str(record.get("cuisine", "Unknown Cuisine")),
"chunk_index": chunk_idx,
"num_chunks_for_source": len(chunks),
},
}
)
return chunk_docs
3) Load Raw Records¶
In [98]:
Copied!
SEED_ATLAS_FROM_LOCAL = True # Set to True if `sample_restaurants.restaurants` is empty
if SEED_ATLAS_FROM_LOCAL and RUN_ATLAS_VECTOR_SEARCH:
if MongoClient is None:
raise ImportError("pymongo is not installed. Run: pip install pymongo")
if not ATLAS_URI:
raise ValueError("ATLAS_URI not set. Put it in a local .env file or environment variable.")
# Load local sample data and seed the raw source collection
seed_records = load_local_json(LOCAL_JSON_PATH)
seed_client = MongoClient(ATLAS_URI, serverSelectionTimeoutMS=20000)
seed_coll = seed_client[ATLAS_DB_NAME][ATLAS_COLLECTION]
# Clear and insert fresh data
seed_coll.delete_many({})
if seed_records:
seed_coll.insert_many(seed_records)
print(f"✅ Seeded {seed_coll.count_documents({})} records into {ATLAS_DB_NAME}.{ATLAS_COLLECTION}")
else:
print("No records to seed")
elif not RUN_ATLAS_VECTOR_SEARCH:
print("Seeding skipped (enable RUN_ATLAS_VECTOR_SEARCH to seed Atlas)")
else:
print("Seeding skipped (SEED_ATLAS_FROM_LOCAL = False)")
SEED_ATLAS_FROM_LOCAL = True # Set to True if `sample_restaurants.restaurants` is empty
if SEED_ATLAS_FROM_LOCAL and RUN_ATLAS_VECTOR_SEARCH:
if MongoClient is None:
raise ImportError("pymongo is not installed. Run: pip install pymongo")
if not ATLAS_URI:
raise ValueError("ATLAS_URI not set. Put it in a local .env file or environment variable.")
# Load local sample data and seed the raw source collection
seed_records = load_local_json(LOCAL_JSON_PATH)
seed_client = MongoClient(ATLAS_URI, serverSelectionTimeoutMS=20000)
seed_coll = seed_client[ATLAS_DB_NAME][ATLAS_COLLECTION]
# Clear and insert fresh data
seed_coll.delete_many({})
if seed_records:
seed_coll.insert_many(seed_records)
print(f"✅ Seeded {seed_coll.count_documents({})} records into {ATLAS_DB_NAME}.{ATLAS_COLLECTION}")
else:
print("No records to seed")
elif not RUN_ATLAS_VECTOR_SEARCH:
print("Seeding skipped (enable RUN_ATLAS_VECTOR_SEARCH to seed Atlas)")
else:
print("Seeding skipped (SEED_ATLAS_FROM_LOCAL = False)")
✅ Seeded 10 records into sample_restaurants.restaurants
In [99]:
Copied!
if SOURCE_MODE == "local":
raw_records = load_local_json(LOCAL_JSON_PATH)[:MAX_RECORDS]
print(f"Loaded {len(raw_records)} local records from {LOCAL_JSON_PATH}")
elif SOURCE_MODE == "atlas":
raw_records = load_from_atlas(ATLAS_URI, ATLAS_DB_NAME, ATLAS_COLLECTION, MAX_RECORDS)
print(f"Loaded {len(raw_records)} records from Atlas: {ATLAS_DB_NAME}.{ATLAS_COLLECTION}")
else:
raise ValueError("SOURCE_MODE must be 'local' or 'atlas'")
print("Sample record keys:", list(raw_records[0].keys()) if raw_records else "No records")
if SOURCE_MODE == "local":
raw_records = load_local_json(LOCAL_JSON_PATH)[:MAX_RECORDS]
print(f"Loaded {len(raw_records)} local records from {LOCAL_JSON_PATH}")
elif SOURCE_MODE == "atlas":
raw_records = load_from_atlas(ATLAS_URI, ATLAS_DB_NAME, ATLAS_COLLECTION, MAX_RECORDS)
print(f"Loaded {len(raw_records)} records from Atlas: {ATLAS_DB_NAME}.{ATLAS_COLLECTION}")
else:
raise ValueError("SOURCE_MODE must be 'local' or 'atlas'")
print("Sample record keys:", list(raw_records[0].keys()) if raw_records else "No records")
Loaded 10 records from Atlas: sample_restaurants.restaurants Sample record keys: ['_id', 'id', 'name', 'address', 'cuisine', 'phone', 'description', 'rating', 'reviews']
4) Normalize and Chunk Records¶
In [100]:
Copied!
chunk_docs = build_chunk_documents(
raw_records,
chunk_size=CHUNK_SIZE_CHARS,
overlap=CHUNK_OVERLAP_CHARS,
)
print(f"Raw records: {len(raw_records)}")
print(f"Chunk documents: {len(chunk_docs)}")
if raw_records:
ratio = len(chunk_docs) / len(raw_records)
print(f"Avg chunks per record: {ratio:.2f}")
print("\nSample chunk:")
print("-" * 80)
print(chunk_docs[0]["text"] if chunk_docs else "No chunks")
chunk_docs = build_chunk_documents(
raw_records,
chunk_size=CHUNK_SIZE_CHARS,
overlap=CHUNK_OVERLAP_CHARS,
)
print(f"Raw records: {len(raw_records)}")
print(f"Chunk documents: {len(chunk_docs)}")
if raw_records:
ratio = len(chunk_docs) / len(raw_records)
print(f"Avg chunks per record: {ratio:.2f}")
print("\nSample chunk:")
print("-" * 80)
print(chunk_docs[0]["text"] if chunk_docs else "No chunks")
Raw records: 10 Chunk documents: 20 Avg chunks per record: 2.00 Sample chunk: -------------------------------------------------------------------------------- Restaurant ID: rest_001. Name: Bella Italia. Cuisine: Italian. Address: 123 Main Street, New York, NY 10001. Rating: 4.5. Description: Authentic Italian restaurant serving traditional pasta and risotto dishes with imported ingredients. Reviews: Amazing pasta, felt like being in R
5) Initialize Embedding Model¶
In [101]:
Copied!
model = SentenceTransformer(MODEL_NAME)
print(f"Model loaded: {MODEL_NAME}")
model = SentenceTransformer(MODEL_NAME)
print(f"Model loaded: {MODEL_NAME}")
Loading weights: 0%| | 0/103 [00:00<?, ?it/s]
Model loaded: all-MiniLM-L6-v2
6) Atlas Setup and Ingestion¶
One-time secure setup (no password in Git)¶
- Create a local
.envfile in repo root (already gitignored below). - Add your URI there:
ATLAS_URI="mongodb+srv://<user>:<password>@<cluster>.mongodb.net/?retryWrites=true&w=majority"
- Keep
RUN_ATLAS_VECTOR_SEARCH = Trueafter this setup.
Atlas Search Index JSON (create once in Atlas UI)¶
Atlas UI -> Atlas Search -> Create Search Index -> JSON editor:
{
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 384,
"similarity": "cosine"
}
]
}
Index name in this lab: vector_index.
In [105]:
Copied!
BATCH_SIZE = 32
def batched(seq: List[Any], size: int):
for i in range(0, len(seq), size):
yield seq[i : i + size]
if not RUN_ATLAS_VECTOR_SEARCH:
print("Atlas steps skipped. Set RUN_ATLAS_VECTOR_SEARCH=True after configuring ATLAS_URI in .env")
else:
if MongoClient is None:
raise ImportError("pymongo is not installed. Run: pip install pymongo")
if not ATLAS_URI:
raise ValueError("ATLAS_URI not set. Put it in a local .env file or environment variable.")
atlas_client = MongoClient(ATLAS_URI, serverSelectionTimeoutMS=20000)
atlas_client.admin.command("ping")
atlas_coll = atlas_client[ATLAS_VECTOR_DB][ATLAS_VECTOR_COLLECTION]
# Recreate collection contents for deterministic reruns
atlas_coll.delete_many({})
start_time = time.time()
inserted = 0
for batch in batched(chunk_docs, BATCH_SIZE):
texts = [item["text"] for item in batch]
embs = model.encode(texts, show_progress_bar=False)
docs = []
for item, emb in zip(batch, embs):
docs.append(
{
"_id": item["chunk_id"],
"text": item["text"],
"source_id": item["metadata"]["source_id"],
"name": item["metadata"]["name"],
"cuisine": item["metadata"]["cuisine"],
"chunk_index": item["metadata"]["chunk_index"],
"num_chunks_for_source": item["metadata"]["num_chunks_for_source"],
"embedding": emb.tolist(),
}
)
atlas_coll.insert_many(docs)
inserted += len(docs)
elapsed = time.time() - start_time
rate = inserted / elapsed if elapsed > 0 else 0.0
print(f"Inserted chunks: {inserted}")
print(f"Time: {elapsed:.2f}s")
print(f"Throughput: {rate:.2f} chunks/sec")
print(f"Atlas count: {atlas_coll.count_documents({})}")
# Ensure vector search index exists on the VECTOR collection
search_indexes = []
try:
search_indexes = list(atlas_coll.list_search_indexes())
except Exception as e:
print(f"Could not list search indexes: {type(e).__name__}: {e}")
if not any(idx.get("name") == ATLAS_VECTOR_INDEX_NAME for idx in search_indexes):
print(f"Search index '{ATLAS_VECTOR_INDEX_NAME}' not found on {ATLAS_VECTOR_DB}.{ATLAS_VECTOR_COLLECTION}.")
print("Attempting to create it programmatically...")
try:
atlas_coll.create_search_index(
{
"name": ATLAS_VECTOR_INDEX_NAME,
"definition": {
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 384,
"similarity": "cosine",
}
]
},
}
)
print("Index creation requested.")
except Exception as e:
print("Auto-create failed. Create index in Atlas UI manually.")
print(f"Details: {type(e).__name__}: {e}")
# Wait briefly for index to become queryable
max_wait_s = 60
waited = 0
is_queryable = False
while waited < max_wait_s:
try:
idxs = list(atlas_coll.list_search_indexes())
for idx in idxs:
if idx.get("name") == ATLAS_VECTOR_INDEX_NAME and idx.get("queryable", False):
is_queryable = True
break
if is_queryable:
break
except Exception:
pass
time.sleep(5)
waited += 5
if is_queryable:
print(f"Search index '{ATLAS_VECTOR_INDEX_NAME}' is queryable.")
else:
print(f"Search index '{ATLAS_VECTOR_INDEX_NAME}' is not queryable yet.")
print("If queries return empty, wait another minute and rerun cells 7-8.")
BATCH_SIZE = 32
def batched(seq: List[Any], size: int):
for i in range(0, len(seq), size):
yield seq[i : i + size]
if not RUN_ATLAS_VECTOR_SEARCH:
print("Atlas steps skipped. Set RUN_ATLAS_VECTOR_SEARCH=True after configuring ATLAS_URI in .env")
else:
if MongoClient is None:
raise ImportError("pymongo is not installed. Run: pip install pymongo")
if not ATLAS_URI:
raise ValueError("ATLAS_URI not set. Put it in a local .env file or environment variable.")
atlas_client = MongoClient(ATLAS_URI, serverSelectionTimeoutMS=20000)
atlas_client.admin.command("ping")
atlas_coll = atlas_client[ATLAS_VECTOR_DB][ATLAS_VECTOR_COLLECTION]
# Recreate collection contents for deterministic reruns
atlas_coll.delete_many({})
start_time = time.time()
inserted = 0
for batch in batched(chunk_docs, BATCH_SIZE):
texts = [item["text"] for item in batch]
embs = model.encode(texts, show_progress_bar=False)
docs = []
for item, emb in zip(batch, embs):
docs.append(
{
"_id": item["chunk_id"],
"text": item["text"],
"source_id": item["metadata"]["source_id"],
"name": item["metadata"]["name"],
"cuisine": item["metadata"]["cuisine"],
"chunk_index": item["metadata"]["chunk_index"],
"num_chunks_for_source": item["metadata"]["num_chunks_for_source"],
"embedding": emb.tolist(),
}
)
atlas_coll.insert_many(docs)
inserted += len(docs)
elapsed = time.time() - start_time
rate = inserted / elapsed if elapsed > 0 else 0.0
print(f"Inserted chunks: {inserted}")
print(f"Time: {elapsed:.2f}s")
print(f"Throughput: {rate:.2f} chunks/sec")
print(f"Atlas count: {atlas_coll.count_documents({})}")
# Ensure vector search index exists on the VECTOR collection
search_indexes = []
try:
search_indexes = list(atlas_coll.list_search_indexes())
except Exception as e:
print(f"Could not list search indexes: {type(e).__name__}: {e}")
if not any(idx.get("name") == ATLAS_VECTOR_INDEX_NAME for idx in search_indexes):
print(f"Search index '{ATLAS_VECTOR_INDEX_NAME}' not found on {ATLAS_VECTOR_DB}.{ATLAS_VECTOR_COLLECTION}.")
print("Attempting to create it programmatically...")
try:
atlas_coll.create_search_index(
{
"name": ATLAS_VECTOR_INDEX_NAME,
"definition": {
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 384,
"similarity": "cosine",
}
]
},
}
)
print("Index creation requested.")
except Exception as e:
print("Auto-create failed. Create index in Atlas UI manually.")
print(f"Details: {type(e).__name__}: {e}")
# Wait briefly for index to become queryable
max_wait_s = 60
waited = 0
is_queryable = False
while waited < max_wait_s:
try:
idxs = list(atlas_coll.list_search_indexes())
for idx in idxs:
if idx.get("name") == ATLAS_VECTOR_INDEX_NAME and idx.get("queryable", False):
is_queryable = True
break
if is_queryable:
break
except Exception:
pass
time.sleep(5)
waited += 5
if is_queryable:
print(f"Search index '{ATLAS_VECTOR_INDEX_NAME}' is queryable.")
else:
print(f"Search index '{ATLAS_VECTOR_INDEX_NAME}' is not queryable yet.")
print("If queries return empty, wait another minute and rerun cells 7-8.")
Inserted chunks: 20 Time: 0.41s Throughput: 48.60 chunks/sec Atlas count: 20 Search index 'vector_index' is queryable.
7) Query Atlas Vector Search¶
This cell requires RUN_ATLAS_VECTOR_SEARCH = True and a ready Atlas Search index.
In [106]:
Copied!
queries = [
"best sushi restaurant with omakase",
"italian pasta and risotto",
"spicy mexican tacos",
]
if not RUN_ATLAS_VECTOR_SEARCH:
print("Query step skipped. Enable RUN_ATLAS_VECTOR_SEARCH to run Atlas queries.")
else:
for q in queries:
q_emb = model.encode(q).tolist()
pipeline = [
{
"$vectorSearch": {
"index": ATLAS_VECTOR_INDEX_NAME,
"path": "embedding",
"queryVector": q_emb,
"numCandidates": 30,
"limit": 3,
}
},
{
"$project": {
"_id": 0,
"text": 1,
"name": 1,
"cuisine": 1,
"source_id": 1,
"chunk_index": 1,
"score": {"$meta": "vectorSearchScore"},
}
},
]
results = list(atlas_coll.aggregate(pipeline))
print("\n" + "=" * 90)
print(f"Query: {q}")
print("-" * 90)
if not results:
print("No results returned.")
print("Most common cause: missing/not-ready Atlas Search index 'vector_index'.")
print("Also verify collection = rag_lab.restaurant_chunks and embedding field exists.")
continue
for rank, r in enumerate(results, 1):
print(f"{rank}. score={r['score']:.4f} | name={r.get('name')} | cuisine={r.get('cuisine')}")
print(f" {r.get('text', '')[:140]}...")
queries = [
"best sushi restaurant with omakase",
"italian pasta and risotto",
"spicy mexican tacos",
]
if not RUN_ATLAS_VECTOR_SEARCH:
print("Query step skipped. Enable RUN_ATLAS_VECTOR_SEARCH to run Atlas queries.")
else:
for q in queries:
q_emb = model.encode(q).tolist()
pipeline = [
{
"$vectorSearch": {
"index": ATLAS_VECTOR_INDEX_NAME,
"path": "embedding",
"queryVector": q_emb,
"numCandidates": 30,
"limit": 3,
}
},
{
"$project": {
"_id": 0,
"text": 1,
"name": 1,
"cuisine": 1,
"source_id": 1,
"chunk_index": 1,
"score": {"$meta": "vectorSearchScore"},
}
},
]
results = list(atlas_coll.aggregate(pipeline))
print("\n" + "=" * 90)
print(f"Query: {q}")
print("-" * 90)
if not results:
print("No results returned.")
print("Most common cause: missing/not-ready Atlas Search index 'vector_index'.")
print("Also verify collection = rag_lab.restaurant_chunks and embedding field exists.")
continue
for rank, r in enumerate(results, 1):
print(f"{rank}. score={r['score']:.4f} | name={r.get('name')} | cuisine={r.get('cuisine')}")
print(f" {r.get('text', '')[:140]}...")
==========================================================================================
Query: best sushi restaurant with omakase
------------------------------------------------------------------------------------------
1. score=0.8485 | name=Sakura Sushi | cuisine=Japanese
Restaurant ID: rest_003. Name: Sakura Sushi. Cuisine: Japanese. Address: 789 5th Avenue, New York, NY 10022. Rating: 4.8. Description: Premi...
2. score=0.8436 | name=Sakura Sushi | cuisine=Japanese
tions. Reviews: Best sushi experience in New York | Incredible omakase selection | Chef really knows his craft....
3. score=0.7219 | name=Dragon Palace | cuisine=Chinese
seafood daily. Reviews: Freshest seafood in Chinatown | Dim sum is absolutely delicious | Busy but worth the wait....
==========================================================================================
Query: italian pasta and risotto
------------------------------------------------------------------------------------------
1. score=0.8011 | name=Bella Italia | cuisine=Italian
ed ingredients. Reviews: Amazing pasta, felt like being in Rome | Great service and reasonable prices | Best carbonara outside Italy....
2. score=0.7358 | name=Bella Italia | cuisine=Italian
Restaurant ID: rest_001. Name: Bella Italia. Cuisine: Italian. Address: 123 Main Street, New York, NY 10001. Rating: 4.5. Description: Authe...
3. score=0.6477 | name=Le Petit Café | cuisine=French
h seafood. Reviews: Authentic French cooking at its best | Romantic atmosphere perfect for dates | Wine selection is excellent....
==========================================================================================
Query: spicy mexican tacos
------------------------------------------------------------------------------------------
1. score=0.8015 | name=Taco Fiesta | cuisine=Mexican
Restaurant ID: rest_005. Name: Taco Fiesta. Cuisine: Mexican. Address: 654 W 3rd Street, New York, NY 10014. Rating: 4.2. Description: Vibra...
2. score=0.7826 | name=Taco Fiesta | cuisine=Mexican
illas. Reviews: Best tacos in the city | Pork carnitas are incredible | Fresh margaritas and great music....
3. score=0.7181 | name=Taj Mahal | cuisine=Indian
ive spice selection. Reviews: Aromatic spices and tender meat | Best butter chicken around | Authentic flavors from North India....
8) Checkpoint: Validate Atlas Metadata Preservation¶
In [107]:
Copied!
if not RUN_ATLAS_VECTOR_SEARCH:
print("Checkpoint skipped. Enable RUN_ATLAS_VECTOR_SEARCH to inspect Atlas documents.")
else:
sample_docs = list(
atlas_coll.find(
{},
{
"_id": 1,
"source_id": 1,
"chunk_index": 1,
"cuisine": 1,
"name": 1,
},
).limit(5)
)
print("Retrieved sample metadata rows:")
for i, d in enumerate(sample_docs, 1):
print(f"{i}. source_id={d.get('source_id')} | chunk_index={d.get('chunk_index')} | cuisine={d.get('cuisine')}")
if not RUN_ATLAS_VECTOR_SEARCH:
print("Checkpoint skipped. Enable RUN_ATLAS_VECTOR_SEARCH to inspect Atlas documents.")
else:
sample_docs = list(
atlas_coll.find(
{},
{
"_id": 1,
"source_id": 1,
"chunk_index": 1,
"cuisine": 1,
"name": 1,
},
).limit(5)
)
print("Retrieved sample metadata rows:")
for i, d in enumerate(sample_docs, 1):
print(f"{i}. source_id={d.get('source_id')} | chunk_index={d.get('chunk_index')} | cuisine={d.get('cuisine')}")
Retrieved sample metadata rows: 1. source_id=rest_001 | chunk_index=0 | cuisine=Italian 2. source_id=rest_001 | chunk_index=1 | cuisine=Italian 3. source_id=rest_002 | chunk_index=0 | cuisine=Chinese 4. source_id=rest_002 | chunk_index=1 | cuisine=Chinese 5. source_id=rest_003 | chunk_index=0 | cuisine=Japanese
9) Exercises¶
- Increase
MAX_RECORDSto1000(Atlas mode) and compare ingestion throughput. - Change chunking to
CHUNK_SIZE_CHARS=420andCHUNK_OVERLAP_CHARS=80; compare retrieval quality. - Add metadata filtering in query time (for example, only
cuisine == "Japanese"). - Add a filter query for only one cuisine using a
$matchstage after$vectorSearch. - Try larger
MAX_RECORDSand compare ingestion throughput.
Key Takeaway¶
A robust RAG ingestion pipeline is not only embedding text. It is data normalization + chunking strategy + metadata retention + searchable indexing.
✅ Lab 4 complete: You built an end-to-end ingestion pipeline from real records to retrievable vector chunks.