Lab 7: Complete RAG Pipeline¶
Level: Advanced | Duration: 3 hours
Objective¶
Build and evaluate an end-to-end Retrieval-Augmented Generation system backed entirely by MongoDB Atlas.
What You'll Learn¶
- Connect all previous labs into one coherent pipeline
- Build multi-stage retrieval: Dense (Atlas
$vectorSearch) + Sparse (BM25) + Hybrid (RRF) - Implement a reranker to refine results before generation
- Augment an LLM prompt with retrieved context
- Evaluate with Relevance, Faithfulness, and Latency metrics
- Ingestion: Load and chunk documents (Lab 4)
- Embedding: Convert text to vectors (Lab 2)
- Storage: Index in vector database (Lab 3)
- Retrieval: Dense + Sparse + Hybrid (Lab 6)
- Reranking: Score and filter results
- Augmentation: Create context for LLM
- Generation: LLM writes final answer
- Evaluation: Measure quality and relevance
Evaluation Metrics¶
- Relevance: Are results related to query? (MRR, NDCG)
- Faithfulness: Do results support the answer?
- Latency: How fast is retrieval? (target: <500ms)
- Coverage: What percentage of queries have good results?
RAG Pipeline Stages¶
User Query
│
├─► Dense Retrieval (Atlas $vectorSearch) ─┐
├─► Sparse Retrieval (BM25 on cached docs) ─┤──► RRF Fusion ──► Rerank ──► Top-K Chunks
│ │
└─────────────────────────────────────────────┘
│
Prompt Builder
│
LLM (or Mock)
│
Final Answer + Evaluation
1) Imports and Configuration¶
In [1]:
Copied!
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import json, os, time, math, re
from dataclasses import dataclass, field
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
from rank_bm25 import BM25Okapi
try:
from pymongo import MongoClient
except ImportError:
MongoClient = None
# Load secrets from .env (ATLAS_URI, optionally OPENAI_API_KEY)
for _base in [Path.cwd(), *Path.cwd().parents]:
if (_base / ".env").exists():
load_dotenv(dotenv_path=_base / ".env", override=False)
break
# ─────────────────────────────────────────
# Configuration — edit these to match your setup
# ─────────────────────────────────────────
ATLAS_URI = os.getenv("ATLAS_URI", "").strip()
ATLAS_VECTOR_DB = "rag_lab"
ATLAS_VECTOR_COLL = "restaurant_chunks" # populated by Lab 4
ATLAS_VECTOR_INDEX = "vector_index" # 384-dim cosine index from Lab 4
EMBED_MODEL = "all-MiniLM-L6-v2"
TOP_K = 5 # candidates to retrieve per retrieval path
RRF_K = 60 # RRF constant (standard = 60)
FINAL_TOP_N = 3 # contexts fed to LLM
# LLM settings
USE_OPENAI = False # set True + add OPENAI_API_KEY to .env
OPENAI_MODEL = "gpt-3.5-turbo"
OPENAI_KEY = os.getenv("OPENAI_API_KEY", "")
print("Configuration loaded")
print(f"ATLAS_URI set: {bool(ATLAS_URI)}")
print(f"EMBED_MODEL: {EMBED_MODEL}")
print(f"USE_OPENAI: {USE_OPENAI}")
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import json, os, time, math, re
from dataclasses import dataclass, field
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
from rank_bm25 import BM25Okapi
try:
from pymongo import MongoClient
except ImportError:
MongoClient = None
# Load secrets from .env (ATLAS_URI, optionally OPENAI_API_KEY)
for _base in [Path.cwd(), *Path.cwd().parents]:
if (_base / ".env").exists():
load_dotenv(dotenv_path=_base / ".env", override=False)
break
# ─────────────────────────────────────────
# Configuration — edit these to match your setup
# ─────────────────────────────────────────
ATLAS_URI = os.getenv("ATLAS_URI", "").strip()
ATLAS_VECTOR_DB = "rag_lab"
ATLAS_VECTOR_COLL = "restaurant_chunks" # populated by Lab 4
ATLAS_VECTOR_INDEX = "vector_index" # 384-dim cosine index from Lab 4
EMBED_MODEL = "all-MiniLM-L6-v2"
TOP_K = 5 # candidates to retrieve per retrieval path
RRF_K = 60 # RRF constant (standard = 60)
FINAL_TOP_N = 3 # contexts fed to LLM
# LLM settings
USE_OPENAI = False # set True + add OPENAI_API_KEY to .env
OPENAI_MODEL = "gpt-3.5-turbo"
OPENAI_KEY = os.getenv("OPENAI_API_KEY", "")
print("Configuration loaded")
print(f"ATLAS_URI set: {bool(ATLAS_URI)}")
print(f"EMBED_MODEL: {EMBED_MODEL}")
print(f"USE_OPENAI: {USE_OPENAI}")
Configuration loaded ATLAS_URI set: True EMBED_MODEL: all-MiniLM-L6-v2 USE_OPENAI: False
2) Connect to Atlas and Load Corpus Cache¶
In [2]:
Copied!
if not ATLAS_URI:
raise ValueError("ATLAS_URI not set. Add it to your .env file.")
if MongoClient is None:
raise ImportError("pymongo not installed. Run: pip install pymongo")
mongo_client = MongoClient(ATLAS_URI, serverSelectionTimeoutMS=20000)
mongo_client.admin.command("ping")
atlas_coll = mongo_client[ATLAS_VECTOR_DB][ATLAS_VECTOR_COLL]
count = atlas_coll.count_documents({})
print(f"✅ Connected to Atlas — {count} chunks in {ATLAS_VECTOR_DB}.{ATLAS_VECTOR_COLL}")
if count == 0:
raise RuntimeError(
"Collection is empty. Run Lab 4 first to ingest restaurant chunks."
)
# Load all chunk texts into memory for BM25 (sparse retrieval)
print("Loading corpus for BM25 index...")
corpus_docs = list(
atlas_coll.find(
{},
{"_id": 1, "text": 1, "name": 1, "cuisine": 1, "source_id": 1, "chunk_index": 1},
)
)
print(f"Loaded {len(corpus_docs)} docs into local BM25 corpus")
if not ATLAS_URI:
raise ValueError("ATLAS_URI not set. Add it to your .env file.")
if MongoClient is None:
raise ImportError("pymongo not installed. Run: pip install pymongo")
mongo_client = MongoClient(ATLAS_URI, serverSelectionTimeoutMS=20000)
mongo_client.admin.command("ping")
atlas_coll = mongo_client[ATLAS_VECTOR_DB][ATLAS_VECTOR_COLL]
count = atlas_coll.count_documents({})
print(f"✅ Connected to Atlas — {count} chunks in {ATLAS_VECTOR_DB}.{ATLAS_VECTOR_COLL}")
if count == 0:
raise RuntimeError(
"Collection is empty. Run Lab 4 first to ingest restaurant chunks."
)
# Load all chunk texts into memory for BM25 (sparse retrieval)
print("Loading corpus for BM25 index...")
corpus_docs = list(
atlas_coll.find(
{},
{"_id": 1, "text": 1, "name": 1, "cuisine": 1, "source_id": 1, "chunk_index": 1},
)
)
print(f"Loaded {len(corpus_docs)} docs into local BM25 corpus")
✅ Connected to Atlas — 20 chunks in rag_lab.restaurant_chunks Loading corpus for BM25 index... Loaded 20 docs into local BM25 corpus
3) Build Retrieval Components¶
In [3]:
Copied!
# ── Dense retrieval via Atlas $vectorSearch ──────────────────────────────────
embed_model = SentenceTransformer(EMBED_MODEL)
def dense_retrieve(query: str, k: int = TOP_K) -> List[Dict]:
q_emb = embed_model.encode(query).tolist()
pipeline = [
{
"$vectorSearch": {
"index": ATLAS_VECTOR_INDEX,
"path": "embedding",
"queryVector": q_emb,
"numCandidates": k * 4,
"limit": k,
}
},
{
"$project": {
"_id": 1, "text": 1, "name": 1,
"cuisine": 1, "source_id": 1, "chunk_index": 1,
"score": {"$meta": "vectorSearchScore"},
}
},
]
results = list(atlas_coll.aggregate(pipeline))
for r in results:
r["retrieval"] = "dense"
return results
# ── Sparse retrieval via BM25 ─────────────────────────────────────────────────
tokenize = lambda t: re.sub(r"[^\w\s]", "", t.lower()).split()
bm25_corpus = [tokenize(d["text"]) for d in corpus_docs]
bm25 = BM25Okapi(bm25_corpus)
def sparse_retrieve(query: str, k: int = TOP_K) -> List[Dict]:
tokens = tokenize(query)
scores = bm25.get_scores(tokens)
top_idx = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k]
results = []
for i in top_idx:
doc = corpus_docs[i].copy()
doc["score"] = float(scores[i])
doc["retrieval"] = "sparse"
results.append(doc)
return results
# ── Reciprocal Rank Fusion ────────────────────────────────────────────────────
def rrf_fuse(
ranked_lists: List[List[Dict]],
k: int = RRF_K,
) -> List[Dict]:
scores: Dict[str, float] = {}
docs_by_id: Dict[str, Dict] = {}
for ranked in ranked_lists:
for rank, doc in enumerate(ranked, start=1):
doc_id = str(doc["_id"])
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank)
docs_by_id[doc_id] = doc
fused = sorted(docs_by_id.values(), key=lambda d: scores[str(d["_id"])], reverse=True)
for doc in fused:
doc["rrf_score"] = scores[str(doc["_id"])]
return fused
# ── Simple cosine reranker ────────────────────────────────────────────────────
def rerank(query: str, docs: List[Dict], top_n: int = FINAL_TOP_N) -> List[Dict]:
if not docs:
return docs
q_emb = embed_model.encode(query)
texts = [d["text"] for d in docs]
d_embs = embed_model.encode(texts)
import numpy as np
sims = d_embs @ q_emb / (
(np.linalg.norm(d_embs, axis=1) * np.linalg.norm(q_emb)) + 1e-9
)
ranked = sorted(zip(sims, docs), key=lambda x: x[0], reverse=True)
for sim, doc in ranked:
doc["rerank_score"] = float(sim)
return [doc for _, doc in ranked[:top_n]]
print("✅ Retrieval components ready (dense, sparse BM25, RRF, reranker)")
# ── Dense retrieval via Atlas $vectorSearch ──────────────────────────────────
embed_model = SentenceTransformer(EMBED_MODEL)
def dense_retrieve(query: str, k: int = TOP_K) -> List[Dict]:
q_emb = embed_model.encode(query).tolist()
pipeline = [
{
"$vectorSearch": {
"index": ATLAS_VECTOR_INDEX,
"path": "embedding",
"queryVector": q_emb,
"numCandidates": k * 4,
"limit": k,
}
},
{
"$project": {
"_id": 1, "text": 1, "name": 1,
"cuisine": 1, "source_id": 1, "chunk_index": 1,
"score": {"$meta": "vectorSearchScore"},
}
},
]
results = list(atlas_coll.aggregate(pipeline))
for r in results:
r["retrieval"] = "dense"
return results
# ── Sparse retrieval via BM25 ─────────────────────────────────────────────────
tokenize = lambda t: re.sub(r"[^\w\s]", "", t.lower()).split()
bm25_corpus = [tokenize(d["text"]) for d in corpus_docs]
bm25 = BM25Okapi(bm25_corpus)
def sparse_retrieve(query: str, k: int = TOP_K) -> List[Dict]:
tokens = tokenize(query)
scores = bm25.get_scores(tokens)
top_idx = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k]
results = []
for i in top_idx:
doc = corpus_docs[i].copy()
doc["score"] = float(scores[i])
doc["retrieval"] = "sparse"
results.append(doc)
return results
# ── Reciprocal Rank Fusion ────────────────────────────────────────────────────
def rrf_fuse(
ranked_lists: List[List[Dict]],
k: int = RRF_K,
) -> List[Dict]:
scores: Dict[str, float] = {}
docs_by_id: Dict[str, Dict] = {}
for ranked in ranked_lists:
for rank, doc in enumerate(ranked, start=1):
doc_id = str(doc["_id"])
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank)
docs_by_id[doc_id] = doc
fused = sorted(docs_by_id.values(), key=lambda d: scores[str(d["_id"])], reverse=True)
for doc in fused:
doc["rrf_score"] = scores[str(doc["_id"])]
return fused
# ── Simple cosine reranker ────────────────────────────────────────────────────
def rerank(query: str, docs: List[Dict], top_n: int = FINAL_TOP_N) -> List[Dict]:
if not docs:
return docs
q_emb = embed_model.encode(query)
texts = [d["text"] for d in docs]
d_embs = embed_model.encode(texts)
import numpy as np
sims = d_embs @ q_emb / (
(np.linalg.norm(d_embs, axis=1) * np.linalg.norm(q_emb)) + 1e-9
)
ranked = sorted(zip(sims, docs), key=lambda x: x[0], reverse=True)
for sim, doc in ranked:
doc["rerank_score"] = float(sim)
return [doc for _, doc in ranked[:top_n]]
print("✅ Retrieval components ready (dense, sparse BM25, RRF, reranker)")
Warning: You are sending unauthenticated requests to the HF Hub. Please set a HF_TOKEN to enable higher rate limits and faster downloads.
Loading weights: 0%| | 0/103 [00:00<?, ?it/s]
✅ Retrieval components ready (dense, sparse BM25, RRF, reranker)
4) Prompt Builder¶
In [6]:
Copied!
def build_prompt(query: str, context_docs: List[Dict]) -> str:
context_lines = []
for i, doc in enumerate(context_docs, 1):
name = doc.get("name", "Unknown")
cuisine = doc.get("cuisine", "")
text = doc.get("text", "")
context_lines.append(f"[{i}] {name} ({cuisine}): {text}")
context = "\n\n".join(context_lines)
prompt = f"""You are a helpful restaurant assistant. Answer the user's question using ONLY the provided context.
If the context does not contain enough information, say "I don't have enough information."
Context:
{context}
Question: {query}
Answer:"""
return prompt
def build_prompt(query: str, context_docs: List[Dict]) -> str:
context_lines = []
for i, doc in enumerate(context_docs, 1):
name = doc.get("name", "Unknown")
cuisine = doc.get("cuisine", "")
text = doc.get("text", "")
context_lines.append(f"[{i}] {name} ({cuisine}): {text}")
context = "\n\n".join(context_lines)
prompt = f"""You are a helpful restaurant assistant. Answer the user's question using ONLY the provided context.
If the context does not contain enough information, say "I don't have enough information."
Context:
{context}
Question: {query}
Answer:"""
return prompt
5) LLM Generation (Mock + Optional OpenAI)¶
In [7]:
Copied!
def generate_mock(prompt: str, context_docs: List[Dict], query: str) -> str:
"""Rule-based mock answer — runs offline, no API key needed."""
names = [d.get("name", "a restaurant") for d in context_docs]
cuisines = list({d.get("cuisine", "") for d in context_docs if d.get("cuisine")})
return (
f"Based on the retrieved information, '{names[0]}' is a great match for your query: '{query}'. "
f"It offers {cuisines[0] if cuisines else 'various'} cuisine. "
f"Other relevant options include: {', '.join(names[1:]) if len(names) > 1 else 'none found'}."
)
def generate_openai(prompt: str) -> str:
try:
import openai
openai.api_key = OPENAI_KEY
resp = openai.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=300,
)
return resp.choices[0].message.content.strip()
except Exception as e:
return f"[OpenAI error: {e}]"
def generate(query: str, context_docs: List[Dict]) -> str:
prompt = build_prompt(query, context_docs)
if USE_OPENAI and OPENAI_KEY:
return generate_openai(prompt)
return generate_mock(prompt, context_docs, query)
def generate_mock(prompt: str, context_docs: List[Dict], query: str) -> str:
"""Rule-based mock answer — runs offline, no API key needed."""
names = [d.get("name", "a restaurant") for d in context_docs]
cuisines = list({d.get("cuisine", "") for d in context_docs if d.get("cuisine")})
return (
f"Based on the retrieved information, '{names[0]}' is a great match for your query: '{query}'. "
f"It offers {cuisines[0] if cuisines else 'various'} cuisine. "
f"Other relevant options include: {', '.join(names[1:]) if len(names) > 1 else 'none found'}."
)
def generate_openai(prompt: str) -> str:
try:
import openai
openai.api_key = OPENAI_KEY
resp = openai.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=300,
)
return resp.choices[0].message.content.strip()
except Exception as e:
return f"[OpenAI error: {e}]"
def generate(query: str, context_docs: List[Dict]) -> str:
prompt = build_prompt(query, context_docs)
if USE_OPENAI and OPENAI_KEY:
return generate_openai(prompt)
return generate_mock(prompt, context_docs, query)
6) Evaluation Metrics¶
In [8]:
Copied!
def reciprocal_rank(results: List[Dict], relevant_ids: List[str]) -> float:
"""MRR: 1/rank of first relevant hit. 0 if none found."""
for rank, doc in enumerate(results, 1):
if str(doc.get("source_id", doc.get("_id", ""))) in relevant_ids:
return 1.0 / rank
return 0.0
def ndcg_at_k(results: List[Dict], relevant_ids: List[str], k: int = FINAL_TOP_N) -> float:
"""NDCG@k: measures ranking quality of relevant results."""
def dcg(hits):
return sum((2**h - 1) / math.log2(i + 2) for i, h in enumerate(hits))
hits = [1 if str(r.get("source_id", r.get("_id", ""))) in relevant_ids else 0
for r in results[:k]]
ideal_hits = sorted(hits, reverse=True)
d = dcg(ideal_hits)
return dcg(hits) / d if d > 0 else 0.0
def faithfulness_score(answer: str, context_docs: List[Dict]) -> float:
"""Rough faithfulness: fraction of answer words found in context."""
context_text = " ".join(d.get("text", "") for d in context_docs).lower()
answer_words = set(re.sub(r"[^\w\s]", "", answer.lower()).split())
if not answer_words:
return 0.0
found = sum(1 for w in answer_words if w in context_text)
return found / len(answer_words)
def reciprocal_rank(results: List[Dict], relevant_ids: List[str]) -> float:
"""MRR: 1/rank of first relevant hit. 0 if none found."""
for rank, doc in enumerate(results, 1):
if str(doc.get("source_id", doc.get("_id", ""))) in relevant_ids:
return 1.0 / rank
return 0.0
def ndcg_at_k(results: List[Dict], relevant_ids: List[str], k: int = FINAL_TOP_N) -> float:
"""NDCG@k: measures ranking quality of relevant results."""
def dcg(hits):
return sum((2**h - 1) / math.log2(i + 2) for i, h in enumerate(hits))
hits = [1 if str(r.get("source_id", r.get("_id", ""))) in relevant_ids else 0
for r in results[:k]]
ideal_hits = sorted(hits, reverse=True)
d = dcg(ideal_hits)
return dcg(hits) / d if d > 0 else 0.0
def faithfulness_score(answer: str, context_docs: List[Dict]) -> float:
"""Rough faithfulness: fraction of answer words found in context."""
context_text = " ".join(d.get("text", "") for d in context_docs).lower()
answer_words = set(re.sub(r"[^\w\s]", "", answer.lower()).split())
if not answer_words:
return 0.0
found = sum(1 for w in answer_words if w in context_text)
return found / len(answer_words)
7) Full RAG Pipeline Function¶
In [9]:
Copied!
@dataclass
class RAGResult:
query: str
dense_results: List[Dict]
sparse_results: List[Dict]
fused_results: List[Dict]
reranked: List[Dict]
answer: str
latency_ms: float
mrr: float = 0.0
ndcg: float = 0.0
faithfulness: float = 0.0
relevant_ids: List[str] = field(default_factory=list)
def rag_pipeline(query: str, relevant_ids: Optional[List[str]] = None) -> RAGResult:
t0 = time.time()
# Stage 1 — Retrieve
dense = dense_retrieve(query, k=TOP_K)
sparse = sparse_retrieve(query, k=TOP_K)
# Stage 2 — Fuse
fused = rrf_fuse([dense, sparse])
# Stage 3 — Rerank
reranked = rerank(query, fused, top_n=FINAL_TOP_N)
# Stage 4 — Generate
answer = generate(query, reranked)
latency = (time.time() - t0) * 1000
# Stage 5 — Evaluate
rel_ids = relevant_ids or []
mrr = reciprocal_rank(reranked, rel_ids)
ndcg = ndcg_at_k(reranked, rel_ids)
faith = faithfulness_score(answer, reranked)
return RAGResult(
query=query,
dense_results=dense,
sparse_results=sparse,
fused_results=fused,
reranked=reranked,
answer=answer,
latency_ms=latency,
mrr=mrr,
ndcg=ndcg,
faithfulness=faith,
relevant_ids=rel_ids,
)
print("✅ RAG pipeline function ready")
@dataclass
class RAGResult:
query: str
dense_results: List[Dict]
sparse_results: List[Dict]
fused_results: List[Dict]
reranked: List[Dict]
answer: str
latency_ms: float
mrr: float = 0.0
ndcg: float = 0.0
faithfulness: float = 0.0
relevant_ids: List[str] = field(default_factory=list)
def rag_pipeline(query: str, relevant_ids: Optional[List[str]] = None) -> RAGResult:
t0 = time.time()
# Stage 1 — Retrieve
dense = dense_retrieve(query, k=TOP_K)
sparse = sparse_retrieve(query, k=TOP_K)
# Stage 2 — Fuse
fused = rrf_fuse([dense, sparse])
# Stage 3 — Rerank
reranked = rerank(query, fused, top_n=FINAL_TOP_N)
# Stage 4 — Generate
answer = generate(query, reranked)
latency = (time.time() - t0) * 1000
# Stage 5 — Evaluate
rel_ids = relevant_ids or []
mrr = reciprocal_rank(reranked, rel_ids)
ndcg = ndcg_at_k(reranked, rel_ids)
faith = faithfulness_score(answer, reranked)
return RAGResult(
query=query,
dense_results=dense,
sparse_results=sparse,
fused_results=fused,
reranked=reranked,
answer=answer,
latency_ms=latency,
mrr=mrr,
ndcg=ndcg,
faithfulness=faith,
relevant_ids=rel_ids,
)
print("✅ RAG pipeline function ready")
✅ RAG pipeline function ready
8) Run the Pipeline on Sample Queries¶
In [10]:
Copied!
# (relevant_ids are the ground-truth source IDs we expect each query to retrieve)
test_queries = [
{"query": "best sushi omakase in New York", "relevant_ids": ["rest_003"]},
{"query": "authentic Italian pasta and risotto", "relevant_ids": ["rest_001"]},
{"query": "spicy Mexican tacos and margaritas", "relevant_ids": ["rest_005"]},
{"query": "Korean BBQ with table grill", "relevant_ids": ["rest_009"]},
{"query": "plant-based vegan restaurant", "relevant_ids": ["rest_008"]},
]
results = []
for tq in test_queries:
r = rag_pipeline(tq["query"], relevant_ids=tq["relevant_ids"])
results.append(r)
print("\n" + "=" * 90)
print(f"Query: {r.query}")
print(f"Latency: {r.latency_ms:.0f} ms")
print("-" * 90)
print("Top retrieved chunks:")
for i, doc in enumerate(r.reranked, 1):
print(f" {i}. [{doc.get('retrieval','hybrid')}] {doc.get('name')} | "
f"cuisine={doc.get('cuisine')} | "
f"rerank={doc.get('rerank_score', 0):.3f}")
print(f" {doc.get('text','')[:120]}...")
print(f"\nAnswer:\n {r.answer}")
print(f"\nMetrics: MRR={r.mrr:.2f} NDCG@{FINAL_TOP_N}={r.ndcg:.2f} Faithfulness={r.faithfulness:.2f}")
# (relevant_ids are the ground-truth source IDs we expect each query to retrieve)
test_queries = [
{"query": "best sushi omakase in New York", "relevant_ids": ["rest_003"]},
{"query": "authentic Italian pasta and risotto", "relevant_ids": ["rest_001"]},
{"query": "spicy Mexican tacos and margaritas", "relevant_ids": ["rest_005"]},
{"query": "Korean BBQ with table grill", "relevant_ids": ["rest_009"]},
{"query": "plant-based vegan restaurant", "relevant_ids": ["rest_008"]},
]
results = []
for tq in test_queries:
r = rag_pipeline(tq["query"], relevant_ids=tq["relevant_ids"])
results.append(r)
print("\n" + "=" * 90)
print(f"Query: {r.query}")
print(f"Latency: {r.latency_ms:.0f} ms")
print("-" * 90)
print("Top retrieved chunks:")
for i, doc in enumerate(r.reranked, 1):
print(f" {i}. [{doc.get('retrieval','hybrid')}] {doc.get('name')} | "
f"cuisine={doc.get('cuisine')} | "
f"rerank={doc.get('rerank_score', 0):.3f}")
print(f" {doc.get('text','')[:120]}...")
print(f"\nAnswer:\n {r.answer}")
print(f"\nMetrics: MRR={r.mrr:.2f} NDCG@{FINAL_TOP_N}={r.ndcg:.2f} Faithfulness={r.faithfulness:.2f}")
==========================================================================================
Query: best sushi omakase in New York
Latency: 3298 ms
------------------------------------------------------------------------------------------
Top retrieved chunks:
1. [sparse] Sakura Sushi | cuisine=Japanese | rerank=0.718
Restaurant ID: rest_003. Name: Sakura Sushi. Cuisine: Japanese. Address: 789 5th Avenue, New York, NY 10022. Rating: 4.8...
2. [sparse] Sakura Sushi | cuisine=Japanese | rerank=0.718
tions. Reviews: Best sushi experience in New York | Incredible omakase selection | Chef really knows his craft....
3. [dense] Dragon Palace | cuisine=Chinese | rerank=0.427
seafood daily. Reviews: Freshest seafood in Chinatown | Dim sum is absolutely delicious | Busy but worth the wait....
Answer:
Based on the retrieved information, 'Sakura Sushi' is a great match for your query: 'best sushi omakase in New York'. It offers Japanese cuisine. Other relevant options include: Sakura Sushi, Dragon Palace.
Metrics: MRR=1.00 NDCG@3=1.00 Faithfulness=0.48
==========================================================================================
Query: authentic Italian pasta and risotto
Latency: 652 ms
------------------------------------------------------------------------------------------
Top retrieved chunks:
1. [sparse] Bella Italia | cuisine=Italian | rerank=0.599
ed ingredients. Reviews: Amazing pasta, felt like being in Rome | Great service and reasonable prices | Best carbonara o...
2. [sparse] Bella Italia | cuisine=Italian | rerank=0.478
Restaurant ID: rest_001. Name: Bella Italia. Cuisine: Italian. Address: 123 Main Street, New York, NY 10001. Rating: 4.5...
3. [dense] Le Petit Café | cuisine=French | rerank=0.346
h seafood. Reviews: Authentic French cooking at its best | Romantic atmosphere perfect for dates | Wine selection is exc...
Answer:
Based on the retrieved information, 'Bella Italia' is a great match for your query: 'authentic Italian pasta and risotto'. It offers Italian cuisine. Other relevant options include: Bella Italia, Le Petit Café.
Metrics: MRR=1.00 NDCG@3=1.00 Faithfulness=0.55
==========================================================================================
Query: spicy Mexican tacos and margaritas
Latency: 160 ms
------------------------------------------------------------------------------------------
Top retrieved chunks:
1. [sparse] Taco Fiesta | cuisine=Mexican | rerank=0.580
illas. Reviews: Best tacos in the city | Pork carnitas are incredible | Fresh margaritas and great music....
2. [sparse] Taco Fiesta | cuisine=Mexican | rerank=0.522
Restaurant ID: rest_005. Name: Taco Fiesta. Cuisine: Mexican. Address: 654 W 3rd Street, New York, NY 10014. Rating: 4.2...
3. [dense] Taj Mahal | cuisine=Indian | rerank=0.407
ive spice selection. Reviews: Aromatic spices and tender meat | Best butter chicken around | Authentic flavors from Nort...
Answer:
Based on the retrieved information, 'Taco Fiesta' is a great match for your query: 'spicy Mexican tacos and margaritas'. It offers Indian cuisine. Other relevant options include: Taco Fiesta, Taj Mahal.
Metrics: MRR=1.00 NDCG@3=1.00 Faithfulness=0.45
==========================================================================================
Query: Korean BBQ with table grill
Latency: 501 ms
------------------------------------------------------------------------------------------
Top retrieved chunks:
1. [sparse] Seoul Kitchen | cuisine=Korean | rerank=0.646
Restaurant ID: rest_009. Name: Seoul Kitchen. Cuisine: Korean. Address: 369 8th Avenue, New York, NY 10018. Rating: 4.5....
2. [sparse] Seoul Kitchen | cuisine=Korean | rerank=0.634
Reviews: Interactive Korean BBQ experience | Marinated beef is tender and flavorful | Fun atmosphere with friends....
3. [sparse] Grill House Prime | cuisine=Steakhouse | rerank=0.372
Restaurant ID: rest_007. Name: Grill House Prime. Cuisine: Steakhouse. Address: 159 Park Avenue, New York, NY 10154. Rat...
Answer:
Based on the retrieved information, 'Seoul Kitchen' is a great match for your query: 'Korean BBQ with table grill'. It offers Korean cuisine. Other relevant options include: Seoul Kitchen, Grill House Prime.
Metrics: MRR=1.00 NDCG@3=1.00 Faithfulness=0.50
==========================================================================================
Query: plant-based vegan restaurant
Latency: 275 ms
------------------------------------------------------------------------------------------
Top retrieved chunks:
1. [sparse] Garden Vegan | cuisine=Vegan | rerank=0.649
Restaurant ID: rest_008. Name: Garden Vegan. Cuisine: Vegan. Address: 246 Washington Square West, New York, NY 10011. Ra...
2. [sparse] Garden Vegan | cuisine=Vegan | rerank=0.515
ocal organic produce. Reviews: Delicious vegan burgers and bowls | Even meat-eaters will enjoy | Great for dietary restr...
3. [dense] Oasis Mediterranean | cuisine=Mediterranean | rerank=0.381
fresh herbs and olive oil. Reviews: Fresh ingredients and bright flavors | Great hummus and tzatziki | Outdoor seating i...
Answer:
Based on the retrieved information, 'Garden Vegan' is a great match for your query: 'plant-based vegan restaurant'. It offers Vegan cuisine. Other relevant options include: Garden Vegan, Oasis Mediterranean.
Metrics: MRR=1.00 NDCG@3=1.00 Faithfulness=0.44
9) Evaluation Dashboard¶
In [11]:
Copied!
print("\n" + "=" * 90)
print(f"{'EVALUATION DASHBOARD':^90}")
print("=" * 90)
print(f"{'Query':<45} {'Latency':>9} {'MRR':>6} {'NDCG':>6} {'Faith':>7}")
print("-" * 90)
avg_lat = avg_mrr = avg_ndcg = avg_faith = 0.0
for r in results:
print(f"{r.query[:44]:<45} {r.latency_ms:>8.0f}ms {r.mrr:>6.2f} {r.ndcg:>6.2f} {r.faithfulness:>7.2f}")
avg_lat += r.latency_ms
avg_mrr += r.mrr
avg_ndcg += r.ndcg
avg_faith += r.faithfulness
n = len(results)
print("-" * 90)
print(f"{'AVERAGE':<45} {avg_lat/n:>8.0f}ms {avg_mrr/n:>6.2f} {avg_ndcg/n:>6.2f} {avg_faith/n:>7.2f}")
print("=" * 90)
# Pass / Fail guidance
print("\nPipeline Health:")
print(f" Latency < 500ms : {'✅ PASS' if avg_lat/n < 500 else '⚠️ SLOW'}")
print(f" Avg MRR > 0.5 : {'✅ PASS' if avg_mrr/n > 0.5 else '⚠️ LOW'}")
print(f" NDCG > 0.5 : {'✅ PASS' if avg_ndcg/n > 0.5 else '⚠️ LOW'}")
print(f" Faithful > 0.3 : {'✅ PASS' if avg_faith/n > 0.3 else '⚠️ LOW'}")
print("\n" + "=" * 90)
print(f"{'EVALUATION DASHBOARD':^90}")
print("=" * 90)
print(f"{'Query':<45} {'Latency':>9} {'MRR':>6} {'NDCG':>6} {'Faith':>7}")
print("-" * 90)
avg_lat = avg_mrr = avg_ndcg = avg_faith = 0.0
for r in results:
print(f"{r.query[:44]:<45} {r.latency_ms:>8.0f}ms {r.mrr:>6.2f} {r.ndcg:>6.2f} {r.faithfulness:>7.2f}")
avg_lat += r.latency_ms
avg_mrr += r.mrr
avg_ndcg += r.ndcg
avg_faith += r.faithfulness
n = len(results)
print("-" * 90)
print(f"{'AVERAGE':<45} {avg_lat/n:>8.0f}ms {avg_mrr/n:>6.2f} {avg_ndcg/n:>6.2f} {avg_faith/n:>7.2f}")
print("=" * 90)
# Pass / Fail guidance
print("\nPipeline Health:")
print(f" Latency < 500ms : {'✅ PASS' if avg_lat/n < 500 else '⚠️ SLOW'}")
print(f" Avg MRR > 0.5 : {'✅ PASS' if avg_mrr/n > 0.5 else '⚠️ LOW'}")
print(f" NDCG > 0.5 : {'✅ PASS' if avg_ndcg/n > 0.5 else '⚠️ LOW'}")
print(f" Faithful > 0.3 : {'✅ PASS' if avg_faith/n > 0.3 else '⚠️ LOW'}")
==========================================================================================
EVALUATION DASHBOARD
==========================================================================================
Query Latency MRR NDCG Faith
------------------------------------------------------------------------------------------
best sushi omakase in New York 3298ms 1.00 1.00 0.48
authentic Italian pasta and risotto 652ms 1.00 1.00 0.55
spicy Mexican tacos and margaritas 160ms 1.00 1.00 0.45
Korean BBQ with table grill 501ms 1.00 1.00 0.50
plant-based vegan restaurant 275ms 1.00 1.00 0.44
------------------------------------------------------------------------------------------
AVERAGE 977ms 1.00 1.00 0.48
==========================================================================================
Pipeline Health:
Latency < 500ms : ⚠️ SLOW
Avg MRR > 0.5 : ✅ PASS
NDCG > 0.5 : ✅ PASS
Faithful > 0.3 : ✅ PASS
10) Manual Ad-Hoc Query¶
In [12]:
Copied!
# Change this query to anything you like and re-run the cell
my_query = "romantic dinner with wine and French food"
r = rag_pipeline(my_query)
print(f"Query: {my_query}")
print(f"Latency: {r.latency_ms:.0f} ms\n")
print("Retrieved chunks:")
for i, doc in enumerate(r.reranked, 1):
print(f" {i}. {doc.get('name')} ({doc.get('cuisine')}) — rerank={doc.get('rerank_score',0):.3f}")
print(f" {doc.get('text','')[:130]}...")
print(f"\nAnswer:\n{r.answer}")
# Change this query to anything you like and re-run the cell
my_query = "romantic dinner with wine and French food"
r = rag_pipeline(my_query)
print(f"Query: {my_query}")
print(f"Latency: {r.latency_ms:.0f} ms\n")
print("Retrieved chunks:")
for i, doc in enumerate(r.reranked, 1):
print(f" {i}. {doc.get('name')} ({doc.get('cuisine')}) — rerank={doc.get('rerank_score',0):.3f}")
print(f" {doc.get('text','')[:130]}...")
print(f"\nAnswer:\n{r.answer}")
Query: romantic dinner with wine and French food
Latency: 506 ms
Retrieved chunks:
1. Le Petit Café (French) — rerank=0.477
h seafood. Reviews: Authentic French cooking at its best | Romantic atmosphere perfect for dates | Wine selection is excellent....
2. Le Petit Café (French) — rerank=0.404
Restaurant ID: rest_004. Name: Le Petit Café. Cuisine: French. Address: 321 6th Avenue, New York, NY 10014. Rating: 4.6. Descripti...
3. Grill House Prime (Steakhouse) — rerank=0.346
wine collection. Reviews: Best steak in Manhattan | Impeccable service and atmosphere | Worth every penny for special occasions....
Answer:
Based on the retrieved information, 'Le Petit Café' is a great match for your query: 'romantic dinner with wine and French food'. It offers French cuisine. Other relevant options include: Le Petit Café, Grill House Prime.
11) How to Enable Real LLM (Optional)¶
To use OpenAI GPT instead of the mock:
echo 'OPENAI_API_KEY="sk-..."' >> .env
Then in cell 1 set:
USE_OPENAI = True
The pipeline automatically routes to OpenAI when USE_OPENAI=True and OPENAI_API_KEY is set.
12) Exercises¶
- Try changing
TOP_Kfrom 5 to 10. Does retrieval quality improve? - Set
RRF_K = 10instead of 60. How does ranking change? - Add a metadata filter to
dense_retrieve— only retrievecuisine == "Japanese". - Enable OpenAI and compare mock vs real LLM answers.
- Add a 6th test query about steakhouse — predict the expected
relevant_idand verify.
Key Takeaway¶
A complete RAG pipeline is: chunk → embed → index → dense+sparse retrieve → fuse → rerank → prompt → generate → evaluate
✅ Lab 7 complete. You built an end-to-end RAG system on MongoDB Atlas with evaluation metrics.