LangGraph Agent Design
Level: Advanced Pre-reading: 00 · Demo Overview · 04.01 · LangGraph Deep Dive
This document details the LangGraph state machine that orchestrates the entire JIRA-to-PR automation flow — from ticket fetch through code generation to PR creation.
Agent State Schema
from typing import TypedDict, Annotated, Optional
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
# Input
ticket_key: str
thread_id: str
# Ticket info (fetched from JIRA)
ticket_summary: str
ticket_description: str
ticket_type: str # "Bug" | "Story"
ticket_labels: list[str]
acceptance_criteria: list[str]
# Analysis results
affected_modules: list[str] # e.g., ["taskmaster-core"]
affected_files: list[str] # e.g., ["taskmaster-core/src/.../TaskService.java"]
root_cause: Optional[str] # bugs only
# RAG context
retrieved_chunks: list[dict] # from pgvector
# Code generation
proposed_changes: list[dict] # [{file_path, new_content, description}]
proposed_tests: list[dict] # [{file_path, new_content, description}]
diff_summary: str # human-readable summary of all changes
# Human gate
human_approved: Optional[bool]
human_feedback: Optional[str]
# GitHub outputs
branch_name: str
pr_url: Optional[str]
# Conversation
messages: Annotated[list, add_messages]
# Control
error: Optional[str]
iteration_count: int # guard against infinite loops
State Machine — Full Graph
stateDiagram-v2
[*] --> fetch_ticket
fetch_ticket --> classify_ticket : success
fetch_ticket --> handle_error : error
classify_ticket --> identify_modules
identify_modules --> retrieve_context
retrieve_context --> generate_code_changes
generate_code_changes --> generate_tests
generate_tests --> prepare_diff_summary
prepare_diff_summary --> human_review_gate
human_review_gate --> apply_changes : approved
human_review_gate --> regenerate_with_feedback : feedback given
human_review_gate --> abort : rejected
regenerate_with_feedback --> generate_code_changes
apply_changes --> create_pull_request
create_pull_request --> post_jira_comment
post_jira_comment --> [*]
abort --> [*]
handle_error --> [*]
Node Implementations
Node 1: fetch_ticket
from langgraph.types import interrupt
from langgraph.graph import StateGraph, END
def fetch_ticket(state: AgentState) -> AgentState:
"""Fetch JIRA ticket details via MCP."""
try:
ticket = jira_mcp_client.get_ticket(state['ticket_key'])
return {
**state,
'ticket_summary': ticket['summary'],
'ticket_description': ticket['description'],
'ticket_type': ticket['type'],
'ticket_labels': ticket['labels'],
'acceptance_criteria': ticket.get('acceptance_criteria', []),
'messages': [AIMessage(content=f"✅ Fetched ticket {state['ticket_key']}: *{ticket['summary']}*")]
}
except Exception as e:
return {**state, 'error': f"Failed to fetch ticket: {e}"}
Node 2: classify_ticket
def classify_ticket(state: AgentState) -> AgentState:
"""Use LLM to confirm ticket type and extract structured info."""
prompt = f"""Classify this JIRA ticket and extract key information.
Ticket Type from JIRA: {state['ticket_type']}
Summary: {state['ticket_summary']}
Description: {state['ticket_description']}
Return JSON:
{{
"confirmed_type": "Bug" | "Story",
"technical_summary": "one sentence technical description",
"affected_components": ["list of suspected classes/modules"],
"complexity": "low" | "medium" | "high"
}}"""
response = bedrock_client.invoke(prompt)
parsed = json.loads(response)
return {
**state,
'ticket_type': parsed['confirmed_type'],
'messages': [AIMessage(content=f"📋 Classified as **{parsed['confirmed_type']}** — complexity: {parsed['complexity']}")]
}
Node 3: identify_modules
def identify_modules(state: AgentState) -> AgentState:
"""Determine which modules need to change, with minimal-scope reasoning."""
prompt = f"""Given this ticket, which modules in the TaskMaster project need to change?
Ticket: {state['ticket_summary']}
Description: {state['ticket_description']}
Type: {state['ticket_type']}
Modules available:
- taskmaster-core: domain entities (Task), repository, service (TaskService)
- taskmaster-api: REST controllers (TaskController), DTOs (TaskRequest/TaskResponse)
- taskmaster-e2e: Playwright E2E tests
Rules:
1. For bug fixes in service layer → only taskmaster-core (and its tests)
2. For stories adding new fields → taskmaster-core + taskmaster-api + taskmaster-e2e
3. NEVER modify modules that don't need to change
Return JSON:
{{
"affected_modules": ["list of module names"],
"reasoning": "why each module is included",
"files_likely_to_change": ["list of likely file paths"]
}}"""
response = bedrock_client.invoke(prompt)
parsed = json.loads(response)
return {
**state,
'affected_modules': parsed['affected_modules'],
'affected_files': parsed['files_likely_to_change'],
'messages': [AIMessage(content=f"🎯 Modules affected: **{', '.join(parsed['affected_modules'])}**\n\n_{parsed['reasoning']}_")]
}
Node 4: retrieve_context
def retrieve_context(state: AgentState) -> AgentState:
"""RAG retrieval: get relevant code chunks for each affected module."""
query = f"{state['ticket_summary']} {state['ticket_description']}"
all_chunks = []
for module in state['affected_modules']:
chunks = retrieve_relevant_code(
query=query,
conn=db_conn,
module_filter=module,
top_k=4 # 4 chunks per module → max ~16 total
)
all_chunks.extend(chunks)
# Sort by similarity and keep top 10 overall
all_chunks.sort(key=lambda x: x['similarity'], reverse=True)
top_chunks = all_chunks[:10]
context_summary = "\n".join(
f"- `{c['file_path']}` (similarity: {c['similarity']:.2f})"
for c in top_chunks
)
return {
**state,
'retrieved_chunks': top_chunks,
'messages': [AIMessage(content=f"🔍 Retrieved {len(top_chunks)} relevant code chunks:\n{context_summary}")]
}
Node 5: generate_code_changes
def generate_code_changes(state: AgentState) -> AgentState:
"""Generate the actual code changes using LLM + RAG context."""
code_context = "\n\n".join([
f"// FILE: {c['file_path']}\n{c['chunk_text']}"
for c in state['retrieved_chunks']
])
if state['ticket_type'] == 'Bug':
prompt = f"""You are fixing a bug in the TaskMaster Java project.
BUG: {state['ticket_summary']}
DESCRIPTION: {state['ticket_description']}
RELEVANT CODE:
{code_context}
Instructions:
1. Identify the exact root cause
2. Generate the minimal fix — do not refactor unrelated code
3. Only modify files in modules: {state['affected_modules']}
Return JSON:
{{
"root_cause": "exact explanation of what causes the bug",
"changes": [
{{
"file_path": "relative path from repo root",
"description": "what changed and why",
"new_content": "complete new file content"
}}
]
}}"""
else:
ac_text = "\n".join(f"- {ac}" for ac in state['acceptance_criteria'])
prompt = f"""You are implementing a story in the TaskMaster Java project.
STORY: {state['ticket_summary']}
DESCRIPTION: {state['ticket_description']}
ACCEPTANCE CRITERIA:
{ac_text}
RELEVANT CODE:
{code_context}
Instructions:
1. Implement all acceptance criteria
2. Follow existing code patterns exactly
3. Maintain backward compatibility
Return JSON:
{{
"changes": [
{{
"file_path": "relative path from repo root",
"description": "what changed and why",
"new_content": "complete new file content"
}}
]
}}"""
response = bedrock_client.invoke(prompt)
parsed = json.loads(response)
return {
**state,
'root_cause': parsed.get('root_cause'),
'proposed_changes': parsed['changes'],
'messages': [AIMessage(content=f"💡 Generated {len(parsed['changes'])} file change(s)")]
}
Node 6: generate_tests
def generate_tests(state: AgentState) -> AgentState:
"""Generate or update unit and E2E tests."""
changes_summary = "\n".join(
f"- {c['file_path']}: {c['description']}"
for c in state['proposed_changes']
)
prompt = f"""Generate tests for the following code changes to TaskMaster.
TICKET: {state['ticket_summary']}
CHANGES MADE:
{changes_summary}
EXISTING TEST CODE:
{next((c['chunk_text'] for c in state['retrieved_chunks'] if 'Test' in c['file_path']), 'No test context found')}
Generate:
1. JUnit 5 unit tests for any Java changes (using AssertJ assertions)
2. Playwright TypeScript tests if the REST API contract changed
Return JSON:
{{
"tests": [
{{
"file_path": "path to test file",
"description": "what is being tested",
"new_content": "complete test file content",
"is_new_file": true/false
}}
]
}}"""
response = bedrock_client.invoke(prompt)
parsed = json.loads(response)
return {
**state,
'proposed_tests': parsed['tests'],
'messages': [AIMessage(content=f"🧪 Generated {len(parsed['tests'])} test file(s)")]
}
Node 7: prepare_diff_summary
def prepare_diff_summary(state: AgentState) -> AgentState:
"""Prepare a human-readable summary of all proposed changes for the HITL gate."""
all_changes = state['proposed_changes'] + state['proposed_tests']
lines = [f"## Proposed Changes for {state['ticket_key']}: {state['ticket_summary']}\n"]
if state.get('root_cause'):
lines.append(f"**Root Cause:** {state['root_cause']}\n")
lines.append("### Files to be changed:\n")
for change in all_changes:
lines.append(f"**`{change['file_path']}`** — {change['description']}")
# Scope guard: flag if >50 lines changed or >1 module touched
total_lines = sum(len(c['new_content'].splitlines()) for c in all_changes)
if total_lines > 300:
lines.append(f"\n⚠️ **Large diff detected:** ~{total_lines} lines across {len(all_changes)} files. Extra review recommended.")
if len(state['affected_modules']) > 2:
lines.append(f"\n⚠️ **Multi-module change:** {', '.join(state['affected_modules'])}. Verify scope is appropriate.")
return {
**state,
'diff_summary': '\n'.join(lines),
'messages': [AIMessage(content='\n'.join(lines))]
}
Node 8: human_review_gate (HITL Interrupt)
def human_review_gate(state: AgentState) -> AgentState:
"""
INTERRUPT: pause execution and surface the diff to the user.
Resumes when the user calls /threads/{thread_id}/resume with their decision.
"""
# This interrupt suspends the graph — execution halts here.
# The chat engine persists state to PostgreSQL via checkpointer.
# The user sees the diff_summary and types "approve", "reject", or provides feedback.
user_response = interrupt({
"prompt": f"{state['diff_summary']}\n\n**Do you approve these changes?** (type `approve`, `reject`, or provide feedback for revision)",
"diff_summary": state['diff_summary'],
"ticket_key": state['ticket_key']
})
if user_response.lower().strip() == 'approve':
return {**state, 'human_approved': True}
elif user_response.lower().strip() == 'reject':
return {**state, 'human_approved': False}
else:
# User provided feedback — loop back for regeneration
return {**state, 'human_approved': None, 'human_feedback': user_response}
Node 9: apply_changes
def apply_changes(state: AgentState) -> AgentState:
"""Create a branch and commit all approved changes via GitHub API."""
branch_name = make_branch_name(state['ticket_key'], state['ticket_summary'])
# Create the branch
create_branch(
owner=github_secret['repo_owner'],
repo=github_secret['repo_name'],
token=github_secret['token'],
branch_name=branch_name
)
# Commit each file
all_changes = state['proposed_changes'] + state['proposed_tests']
for change in all_changes:
commit_file_change(
owner=github_secret['repo_owner'],
repo=github_secret['repo_name'],
token=github_secret['token'],
branch=branch_name,
file_path=change['file_path'],
new_content=change['new_content'],
commit_message=f"fix({state['ticket_key']}): {change['description']}"
)
return {
**state,
'branch_name': branch_name,
'messages': [AIMessage(content=f"📤 Pushed {len(all_changes)} file(s) to branch `{branch_name}`")]
}
Node 10: create_pull_request
def create_pull_request(state: AgentState) -> AgentState:
"""Open a GitHub PR with the agent-generated changes."""
jira_url = f"{jira_secret['base_url']}/browse/{state['ticket_key']}"
pr_body = build_pr_body(
ticket_key=state['ticket_key'],
ticket_url=jira_url,
ticket_type=state['ticket_type'],
summary=state['ticket_summary'],
changes=[c['description'] for c in state['proposed_changes'] + state['proposed_tests']],
)
pr_url = create_pull_request(
owner=github_secret['repo_owner'],
repo=github_secret['repo_name'],
token=github_secret['token'],
branch=state['branch_name'],
title=f"[{state['ticket_key']}] {state['ticket_summary']}",
body=pr_body
)
return {
**state,
'pr_url': pr_url,
'messages': [AIMessage(content=f"🚀 Pull Request created: {pr_url}")]
}
Graph Assembly
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
def build_agent_graph():
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("fetch_ticket", fetch_ticket)
workflow.add_node("classify_ticket", classify_ticket)
workflow.add_node("identify_modules", identify_modules)
workflow.add_node("retrieve_context", retrieve_context)
workflow.add_node("generate_code_changes", generate_code_changes)
workflow.add_node("generate_tests", generate_tests)
workflow.add_node("prepare_diff_summary", prepare_diff_summary)
workflow.add_node("human_review_gate", human_review_gate)
workflow.add_node("apply_changes", apply_changes)
workflow.add_node("create_pull_request", create_pull_request)
workflow.add_node("post_jira_comment", post_jira_comment)
# Define flow
workflow.set_entry_point("fetch_ticket")
workflow.add_edge("fetch_ticket", "classify_ticket")
workflow.add_edge("classify_ticket", "identify_modules")
workflow.add_edge("identify_modules", "retrieve_context")
workflow.add_edge("retrieve_context", "generate_code_changes")
workflow.add_edge("generate_code_changes", "generate_tests")
workflow.add_edge("generate_tests", "prepare_diff_summary")
workflow.add_edge("prepare_diff_summary", "human_review_gate")
# Conditional: after HITL gate
workflow.add_conditional_edges("human_review_gate", route_after_human_gate, {
"apply": "apply_changes",
"revise": "generate_code_changes", # loop back with feedback
"abort": END
})
workflow.add_edge("apply_changes", "create_pull_request")
workflow.add_edge("create_pull_request", "post_jira_comment")
workflow.add_edge("post_jira_comment", END)
# PostgreSQL checkpointer for state persistence + HITL resume
checkpointer = PostgresSaver.from_conn_string(build_pg_conn_string())
return workflow.compile(checkpointer=checkpointer)
def route_after_human_gate(state: AgentState) -> str:
if state.get('human_approved') is True:
return "apply"
elif state.get('human_feedback'):
if state['iteration_count'] >= 3:
return "abort" # prevent infinite revision loops
return "revise"
else:
return "abort"
How does the checkpointer enable the HITL interrupt to work?
When interrupt() is called, LangGraph serialises the entire AgentState to PostgreSQL and suspends execution. The thread ID is returned to the chat engine. When the user responds, the chat engine calls graph.invoke(Command(resume=user_response), config={'thread_id': thread_id}) which reloads state from PostgreSQL and continues from the interrupt point.
What prevents the agent from looping forever on regeneration?
iteration_count is incremented on each pass through generate_code_changes. The route_after_human_gate function hard-caps revisions at 3 iterations. After 3 unsuccessful attempts the flow aborts and notifies the user.
How does the agent know to only change taskmaster-core for TASK-101?
The identify_modules node reasons explicitly about the project structure. The prompt includes the module descriptions and a rule: "for service-layer bugs, only touch taskmaster-core." This is then enforced in generate_code_changes which constrains the file paths to the identified modules.