RAG Code Indexing Pipeline

Level: Intermediate Pre-reading: 00 · Demo Overview · 03.01 · RAG Pipeline · 01 · AWS Infra

This document covers the pipeline that ingests the taskmaster codebase into a pgvector store so the agent can retrieve contextually relevant code when resolving JIRA tickets.


Why RAG the Codebase?

When the agent receives TASK-101 ("Fix NullPointerException in TaskService"), it doesn't grep the whole repo — it retrieves the most semantically similar code chunks:

Query: "NullPointerException assignee field TaskService"
  →  taskmaster-core/src/.../TaskService.java (score: 0.93)
  →  taskmaster-core/src/.../Task.java        (score: 0.87)
  →  TaskServiceTest.java                     (score: 0.81)

This gives the LLM precise context without blowing the context window with the entire repo.


Indexing Architecture

graph LR
    GH["GitHub<br/>taskmaster repo"] -->|clone| Ingestion["Ingestion Lambda<br/>or local script"]
    Ingestion -->|chunk Java/TS files| Chunker["Chunker<br/>Function-level splitter"]
    Chunker -->|text chunks| Bedrock["Amazon Bedrock<br/>Titan Embeddings V2<br/>1536 dims"]
    Bedrock -->|vectors| PGV["RDS pgvector<br/>code_chunks table"]
    GH -->|push event| WebHook["GitHub Webhook<br/>→ Lambda trigger"]
    WebHook -->|re-index changed files| Ingestion

1. Chunking Strategy

Java and TypeScript files are chunked at method/function level, not line count. This keeps each chunk semantically coherent.

import re
from dataclasses import dataclass
from typing import Generator

@dataclass
class CodeChunk:
    file_path: str
    module: str
    language: str
    chunk_text: str
    start_line: int
    end_line: int

def detect_module(file_path: str) -> str:
    """Infer module from file path."""
    if 'taskmaster-core' in file_path:
        return 'taskmaster-core'
    elif 'taskmaster-api' in file_path:
        return 'taskmaster-api'
    elif 'taskmaster-e2e' in file_path:
        return 'taskmaster-e2e'
    return 'unknown'

def chunk_java_file(file_path: str, source: str) -> Generator[CodeChunk, None, None]:
    """Split a Java file into class-level and method-level chunks."""
    module = detect_module(file_path)
    lines = source.splitlines()

    # Yield the full class as one chunk (for class-level context)
    yield CodeChunk(
        file_path=file_path,
        module=module,
        language='java',
        chunk_text=source[:4000],  # cap at ~4000 chars
        start_line=1,
        end_line=len(lines)
    )

    # Yield each method as its own chunk
    method_pattern = re.compile(
        r'((?:(?:public|private|protected|static|final|synchronized)\s+)+)'
        r'(\w+)\s+(\w+)\s*\([^)]*\)\s*(?:throws\s+[\w,\s]+)?\s*\{',
        re.MULTILINE
    )
    for match in method_pattern.finditer(source):
        start = source.rfind('\n', 0, match.start()) + 1
        # Find matching closing brace
        depth = 0
        end = match.start()
        for i, ch in enumerate(source[match.start():], match.start()):
            if ch == '{':
                depth += 1
            elif ch == '}':
                depth -= 1
                if depth == 0:
                    end = i + 1
                    break

        chunk_text = source[start:end]
        if len(chunk_text) > 100:  # skip trivial getters/setters
            yield CodeChunk(
                file_path=file_path,
                module=module,
                language='java',
                chunk_text=chunk_text[:3000],
                start_line=source[:start].count('\n') + 1,
                end_line=source[:end].count('\n') + 1
            )

def chunk_typescript_file(file_path: str, source: str) -> Generator[CodeChunk, None, None]:
    """Split a TypeScript/Playwright file into test-block chunks."""
    module = detect_module(file_path)

    # Yield the full file for small files
    if len(source) < 3000:
        yield CodeChunk(file_path, module, 'typescript', source, 1, source.count('\n') + 1)
        return

    # Split on test() and describe() blocks
    test_pattern = re.compile(r'^(?:test|it|describe)\s*\(', re.MULTILINE)
    positions = [m.start() for m in test_pattern.finditer(source)] + [len(source)]

    for i in range(len(positions) - 1):
        chunk_text = source[positions[i]:positions[i + 1]]
        if len(chunk_text) > 50:
            yield CodeChunk(
                file_path=file_path,
                module=module,
                language='typescript',
                chunk_text=chunk_text[:3000],
                start_line=source[:positions[i]].count('\n') + 1,
                end_line=source[:positions[i + 1]].count('\n') + 1
            )

2. Embedding with Amazon Bedrock Titan

import boto3
import json

bedrock = boto3.client('bedrock-runtime', region_name='us-east-1')

def embed_text(text: str) -> list[float]:
    """Generate a 1536-dim embedding using Amazon Titan Embeddings V2."""
    response = bedrock.invoke_model(
        modelId='amazon.titan-embed-text-v2:0',
        body=json.dumps({
            "inputText": text,
            "dimensions": 1536,
            "normalize": True
        }),
        contentType='application/json',
        accept='application/json'
    )
    return json.loads(response['body'].read())['embedding']

3. Storing Chunks in pgvector

import psycopg2
from psycopg2.extras import execute_batch

def get_db_connection(secret: dict):
    return psycopg2.connect(
        host=secret['host'],
        port=secret['port'],
        dbname=secret['dbname'],
        user=secret['username'],
        password=secret['password']
    )

def upsert_chunks(conn, chunks: list[CodeChunk], embeddings: list[list[float]]):
    with conn.cursor() as cur:
        execute_batch(cur, """
            INSERT INTO code_chunks (repo, file_path, chunk_text, embedding, module, language, updated_at)
            VALUES (%(repo)s, %(file_path)s, %(chunk_text)s, %(embedding)s::vector,
                    %(module)s, %(language)s, NOW())
            ON CONFLICT (repo, file_path, chunk_text)
            DO UPDATE SET embedding = EXCLUDED.embedding, updated_at = NOW()
        """, [
            {
                'repo': 'taskmaster',
                'file_path': c.file_path,
                'chunk_text': c.chunk_text,
                'embedding': embeddings[i],
                'module': c.module,
                'language': c.language
            }
            for i, c in enumerate(chunks)
        ])
    conn.commit()

4. Full Indexing Script

#!/usr/bin/env python3
"""
index_codebase.py — Clone the taskmaster repo and index all Java/TS files
Usage: python3 index_codebase.py
"""
import os
import boto3
import json
import subprocess
import tempfile
from pathlib import Path

def get_secret(secret_id: str) -> dict:
    client = boto3.client('secretsmanager', region_name='us-east-1')
    return json.loads(client.get_secret_value(SecretId=secret_id)['SecretString'])

def clone_repo(github_secret: dict, target_dir: str) -> None:
    token = github_secret['token']
    owner = github_secret['repo_owner']
    repo = github_secret['repo_name']
    url = f"https://x-access-token:{token}@github.com/{owner}/{repo}.git"
    subprocess.run(['git', 'clone', '--depth=1', url, target_dir], check=True)

def index_repo(repo_dir: str, conn, include_extensions=('.java', '.ts')):
    from itertools import islice

    all_chunks = []
    for ext in include_extensions:
        for path in Path(repo_dir).rglob(f'*{ext}'):
            # Skip target/build directories
            if any(skip in str(path) for skip in ['/target/', '/node_modules/', '/.git/']):
                continue

            source = path.read_text(encoding='utf-8', errors='ignore')
            rel_path = str(path.relative_to(repo_dir))

            if ext == '.java':
                all_chunks.extend(chunk_java_file(rel_path, source))
            elif ext == '.ts':
                all_chunks.extend(chunk_typescript_file(rel_path, source))

    # Batch embed (Bedrock has no batch endpoint — do in groups of 10)
    BATCH_SIZE = 10
    for i in range(0, len(all_chunks), BATCH_SIZE):
        batch = all_chunks[i:i + BATCH_SIZE]
        embeddings = [embed_text(c.chunk_text) for c in batch]
        upsert_chunks(conn, batch, embeddings)
        print(f"  Indexed {min(i + BATCH_SIZE, len(all_chunks))}/{len(all_chunks)} chunks")

if __name__ == '__main__':
    github_secret = get_secret('taskmaster/github')
    db_secret = get_secret('taskmaster/db')

    with tempfile.TemporaryDirectory() as tmpdir:
        print("Cloning repo...")
        clone_repo(github_secret, tmpdir)

        print("Connecting to DB...")
        conn = get_db_connection(db_secret)

        print("Indexing codebase...")
        index_repo(tmpdir, conn)

        conn.close()
        print("✅ Indexing complete!")

Run it once to bootstrap the index:

source .venv/bin/activate
python3 index_codebase.py

5. Retrieval at Query Time

def retrieve_relevant_code(query: str, conn, module_filter: str = None,
                            top_k: int = 5) -> list[dict]:
    """Retrieve top-K most relevant code chunks for a given query."""
    query_embedding = embed_text(query)

    with conn.cursor() as cur:
        if module_filter:
            cur.execute("""
                SELECT file_path, module, language, chunk_text,
                       1 - (embedding <=> %s::vector) AS similarity
                FROM code_chunks
                WHERE repo = 'taskmaster' AND module = %s
                ORDER BY embedding <=> %s::vector
                LIMIT %s
            """, (query_embedding, module_filter, query_embedding, top_k))
        else:
            cur.execute("""
                SELECT file_path, module, language, chunk_text,
                       1 - (embedding <=> %s::vector) AS similarity
                FROM code_chunks
                WHERE repo = 'taskmaster'
                ORDER BY embedding <=> %s::vector
                LIMIT %s
            """, (query_embedding, query_embedding, top_k))

        return [
            {
                'file_path': row[0],
                'module': row[1],
                'language': row[2],
                'chunk_text': row[3],
                'similarity': float(row[4])
            }
            for row in cur.fetchall()
        ]

6. Incremental Re-Index on Push

When the agent pushes a fix branch, it re-indexes only the changed files via Lambda:

# lambda/reindex-trigger/handler.py
import json
import subprocess
import tempfile

def handler(event, context):
    """Triggered by SQS message from GitHub push webhook."""
    for record in event['Records']:
        body = json.loads(record['body'])
        if body.get('source') != 'github':
            continue

        payload = body['payload']
        changed_files = [
            f['filename'] for commit in payload.get('commits', [])
            for f in commit.get('added', []) + commit.get('modified', [])
            if f.endswith('.java') or f.endswith('.ts')
        ]

        if changed_files:
            print(f"Re-indexing {len(changed_files)} changed files")
            # Re-run indexing for only the changed files
            reindex_files(changed_files)

pgvector Schema Reference

Column Type Description
id SERIAL Primary key
repo TEXT Repository name (e.g., taskmaster)
file_path TEXT Relative path within the repo
chunk_text TEXT The source code chunk (max ~3000 chars)
embedding vector(1536) Titan Embeddings V2 vector
module TEXT Module name (taskmaster-core, taskmaster-api, etc.)
language TEXT java or typescript
updated_at TIMESTAMPTZ Last indexed timestamp

Why chunk at method level rather than fixed line count?

Method-level chunks ensure each chunk is semantically complete. A fixed 50-line window might split a method in half, making retrieval less useful. The agent needs to see the full getSummary() method to understand and fix it.

How many chunks does the taskmaster repo produce?

Approximately 30–60 chunks for the initial scaffold (3 modules, ~10 files, 3–8 methods each). At $0.0001 per 1K tokens for Titan Embeddings, the full initial index costs under $0.01.

Can I use OpenAI embeddings instead of Bedrock Titan?

Yes. Replace embed_text() with openai.embeddings.create(model='text-embedding-3-small', input=text). The vector dimensions differ (1536 for Titan, 1536 for ada-002, 3072 for text-embedding-3-large) — match the vector(N) column definition accordingly.