AI Agent Architecture Patterns: From Simple to Complex
Choosing the right architecture pattern is crucial for building reliable AI agents. This guide covers the essential patterns, from simple request-response systems to complex multi-agent networks, with trade-offs and use cases for each.
Pattern 1: Simple Request-Response
The most basic agent pattern - perfect for single-shot tasks.
User Request → Agent → LLM → Response
When to Use
- Simple Q&A and information retrieval
- One-off tasks without state
- Prototyping and learning
- Low-complexity workflows
Implementation
class SimpleAgent:
def __init__(self, llm):
self.llm = llm
async def process(self, user_input):
response = await self.llm.generate(user_input)
return response
Pros & Cons
✅ Simple to implement
✅ Fast response time
✅ Easy to debug
❌ No memory between requests
❌ Limited capabilities
❌ No error recovery
Pattern 2: Sequential Workflow
Agents that execute steps in a predefined order.
User Request → Plan → Step 1 → Step 2 → Step N → Response
When to Use
- Multi-step processes (research → summarize → email)
- Data processing pipelines
- Content creation workflows
- Automated reporting
Implementation
class SequentialAgent:
def __init__(self, llm, skills):
self.llm = llm
self.skills = skills
async def process(self, user_input):
# Plan the workflow
plan = await self.llm.create_plan(user_input)
# Execute steps sequentially
results = []
for step in plan.steps:
skill = self.skills[step.skill_name]
result = await skill.execute(step.params)
results.append(result)
# Generate final response
return await self.llm.synthesize(results)
Example: Research Agent
class ResearchAgent(SequentialAgent):
def __init__(self, llm):
super().__init__(llm, {
"search": WebSearchSkill(),
"summarize": SummarySkill(),
"write": WritingSkill(),
"save": FileSkill()
})
async def research_topic(self, topic):
plan = WorkflowPlan([
Step("search", {"query": topic}),
Step("summarize", {"text": "previous_result"}),
Step("write", {"content": "previous_result"}),
Step("save", {"filename": f"{topic}_research.md"})
])
return await self.execute_plan(plan)
Pros & Cons
✅ Predictable execution
✅ Easy to follow logic
✅ Good for structured tasks
❌ Rigid - can’t adapt
❌ Single point of failure
❌ No parallel processing
Pattern 3: State Machine
Agents with defined states and transitions between them.
[Idle] → [Planning] → [Executing] → [Reviewing] → [Complete]
↓ ↓ ↓ ↓
[Error] ← [Error] ← [Error] ← [Error]
When to Use
- Complex workflows with decision points
- Error recovery and retry logic
- Human-in-the-loop processes
- Approval workflows
Implementation
from enum import Enum
from dataclasses import dataclass
class AgentState(Enum):
IDLE = "idle"
PLANNING = "planning"
EXECUTING = "executing"
REVIEWING = "reviewing"
ERROR = "error"
COMPLETE = "complete"
@dataclass
class AgentContext:
current_task: str
results: list
errors: list
retry_count: int
class StateMachineAgent:
def __init__(self, llm, skills):
self.llm = llm
self.skills = skills
self.state = AgentState.IDLE
self.context = AgentContext("", [], [], 0)
async def process(self, user_input):
self.state = AgentState.PLANNING
self.context.current_task = user_input
while self.state != AgentState.COMPLETE:
try:
await self.handle_current_state()
except Exception as e:
await self.handle_error(e)
return self.context.results[-1]
async def handle_current_state(self):
if self.state == AgentState.PLANNING:
plan = await self.llm.create_plan(self.context.current_task)
self.context.plan = plan
self.state = AgentState.EXECUTING
elif self.state == AgentState.EXECUTING:
for step in self.context.plan.steps:
result = await self.skills[step.skill].execute(step.params)
self.context.results.append(result)
self.state = AgentState.REVIEWING
elif self.state == AgentState.REVIEWING:
review = await self.llm.review_results(self.context.results)
if review.needs_revision:
self.state = AgentState.PLANNING
else:
self.state = AgentState.COMPLETE
async def handle_error(self, error):
self.context.errors.append(error)
if self.context.retry_count < 3:
self.context.retry_count += 1
self.state = AgentState.PLANNING
else:
self.state = AgentState.ERROR
raise Exception(f"Agent failed after 3 retries: {error}")
Pros & Cons
✅ Robust error handling
✅ Clear execution flow
✅ Good for complex workflows
❌ More complex to implement
❌ Can get stuck in loops
❌ Harder to debug
Pattern 4. Tool-Calling Agent
Modern pattern using LLM tool calling capabilities.
User Request → LLM (with tools) → Tool Call → Tool Result → LLM → Response
When to Use
- Dynamic workflow selection
- Agent needs to choose tools at runtime
- Complex decision making
- When using modern LLMs with tool support
Implementation
class ToolCallingAgent:
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
self.messages = []
async def process(self, user_input):
self.messages.append({"role": "user", "content": user_input})
while True:
response = await self.llm.generate_with_tools(
messages=self.messages,
tools=self.tools
)
self.messages.append(response)
if response.finish_reason == "tool_calls":
# Execute tool calls
for tool_call in response.tool_calls:
tool = self.tools[tool_call.function.name]
result = await tool.execute(**tool_call.function.arguments)
self.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
else:
# Final response
return response.content
Tool Definition Example
def get_weather_tool():
return {
"name": "get_weather",
"description": "Get current weather for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"}
},
"required": ["city"]
}
}
async def execute_get_weather(city):
# Actual weather API call
weather_api = WeatherAPI()
return await weather_api.get_current(city)
Pros & Cons
✅ Dynamic tool selection
✅ Natural language reasoning
✅ Flexible workflows
❌ Depends on LLM tool quality
❌ Can be unpredictable
❌ Harder to test
Pattern 5: Multi-Agent Orchestration
Multiple specialized agents working together.
User Request → Orchestrator → Agent 1 → Agent 2 → Agent N → Orchestrator → Response
When to Use
- Complex tasks requiring different expertise
- Parallel processing needs
- Different agents for different domains
- Scalable systems
Implementation
class OrchestratorAgent:
def __init__(self, llm, agents):
self.llm = llm
self.agents = agents
async def process(self, user_input):
# Analyze and delegate
analysis = await self.llm.analyze_request(user_input)
# Create agent tasks
tasks = []
for agent_name in analysis.required_agents:
agent = self.agents[agent_name]
task = asyncio.create_task(agent.process(analysis.subtask))
tasks.append(task)
# Execute in parallel
results = await asyncio.gather(*tasks)
# Synthesize results
return await self.llm.synthesize(results)
class ResearchAgent:
async def process(self, task):
# Specialized research logic
return {"research": "research findings"}
class WritingAgent:
async def process(self, task):
# Specialized writing logic
return {"content": "written content"}
Agent Communication Patterns
1. Hierarchical
Orchestrator → Specialist Agents → Orchestrator
2. Peer-to-Peer
Agent 1 ↔ Agent 2 ↔ Agent 3
3. Pipeline
Agent 1 → Agent 2 → Agent 3 → Agent 4
Pros & Cons
✅ Specialized expertise
✅ Parallel processing
✅ Scalable architecture
❌ Complex coordination
❌ Communication overhead
❌ Harder to debug
Pattern 6: Event-Driven Agent
Agents that react to events and messages.
Event → Message Bus → Agent(s) → Action → New Event
When to Use
- Real-time systems
- Microservices architecture
- Reactive systems
- Event-driven workflows
Implementation
class EventDrivenAgent:
def __init__(self, llm, skills, event_bus):
self.llm = llm
self.skills = skills
self.event_bus = event_bus
self.state = {}
async def start(self):
# Subscribe to events
await self.event_bus.subscribe("user_request", self.handle_request)
await self.event_bus.subscribe("task_complete", self.handle_completion)
await self.event_bus.subscribe("error", self.handle_error)
async def handle_request(self, event):
user_input = event.data["request"]
# Process the request
response = await self.process_request(user_input)
# Publish response event
await self.event_bus.publish("response", {
"request_id": event.id,
"response": response
})
async def handle_completion(self, event):
# Update state based on completion
self.state[event.data["task_id"]] = "complete"
# Trigger next steps if needed
if self.should_continue(event.data):
await self.start_next_task(event.data)
Event Types
@dataclass
class Event:
type: str
data: dict
timestamp: datetime
source: str
# Common event types
USER_REQUEST = "user_request"
TASK_STARTED = "task_started"
TASK_COMPLETE = "task_complete"
ERROR_OCCURRED = "error"
STATE_CHANGED = "state_changed"
Pros & Cons
✅ Real-time responsiveness
✅ Loose coupling
✅ Scalable
❌ Complex event flow
❌ Harder to trace
❌ Event ordering issues
Pattern 7: Hybrid Architecture
Combining multiple patterns for complex systems.
User Request → Orchestrator → Event Bus → Specialized Agents → Tool Calling → Response
When to Use
- Enterprise-grade systems
- Complex real-world applications
- Systems requiring multiple capabilities
- Production workloads
Implementation
class HybridAgent:
def __init__(self):
# Multiple patterns combined
self.orchestrator = OrchestratorAgent()
self.event_bus = EventBus()
self.tool_agents = {
"research": ToolCallingAgent(research_tools),
"writing": ToolCallingAgent(writing_tools)
}
# Connect components
self.orchestrator.connect_to_event_bus(self.event_bus)
for agent in self.tool_agents.values():
agent.connect_to_event_bus(self.event_bus)
async def process(self, user_input):
# Start orchestration
await self.orchestrator.process(user_input)
# Events will flow through the system
# Final response will be published
return await self.wait_for_response()
Choosing the Right Pattern
Decision Matrix
| Requirement | Best Pattern | Why |
|---|---|---|
| Simple Q&A | Request-Response | Minimal complexity |
| Multi-step task | Sequential Workflow | Predictable flow |
| Error recovery | State Machine | Robust handling |
| Dynamic tool use | Tool-Calling | LLM-driven selection |
| Specialized tasks | Multi-Agent | Domain expertise |
| Real-time needs | Event-Driven | Immediate response |
| Complex system | Hybrid | Maximum flexibility |
Evolution Path
Most systems evolve from simple to complex:
1. Start with Request-Response (prototype)
2. Add Sequential Workflow (multi-step)
3. Implement State Machine (robustness)
4. Introduce Tool-Calling (flexibility)
5. Scale to Multi-Agent (specialization)
6. Migrate to Hybrid (production)
Implementation Best Practices
1. Start Simple
# Begin with basic pattern
agent = SimpleAgent(llm)
# Add complexity as needed
if needs_workflow:
agent = SequentialAgent(llm, skills)
if needs_error_handling:
agent = StateMachineAgent(llm, skills)
2. Clear Interfaces
from abc import ABC, abstractmethod
class AgentInterface(ABC):
@abstractmethod
async def process(self, request: Request) -> Response:
pass
class SkillInterface(ABC):
@abstractmethod
async def execute(self, params: dict) -> Result:
pass
3. Observability
class ObservableAgent:
def __init__(self, agent):
self.agent = agent
self.metrics = MetricsCollector()
async def process(self, request):
start_time = time.time()
try:
result = await self.agent.process(request)
self.metrics.record_success(time.time() - start_time)
return result
except Exception as e:
self.metrics.record_error(e)
raise
4. Configuration-Driven
# agent_config.yaml
pattern: "multi_agent"
agents:
research:
type: "tool_calling"
tools: ["web_search", "document_reader"]
writing:
type: "sequential"
skills: ["summarize", "format"]
orchestration:
type: "event_driven"
events: ["task_complete", "error"]
Testing Strategies
Unit Testing
def test_sequential_agent():
mock_llm = MockLLM()
mock_skills = {"search": MockSkill()}
agent = SequentialAgent(mock_llm, mock_skills)
result = agent.process("search for python tutorials")
assert result.success == True
Integration Testing
def test_multi_agent_integration():
orchestrator = OrchestratorAgent(llm, agents)
result = orchestrator.process("research and write about AI")
assert "research" in result
assert "content" in result
Load Testing
async def test_agent_scalability():
agent = ToolCallingAgent(llm, tools)
# Simulate concurrent requests
tasks = [agent.process(f"task {i}") for i in range(100)]
results = await asyncio.gather(*tasks)
assert all(r.success for r in results)
Performance Considerations
1. Caching
class CachedAgent:
def __init__(self, agent, cache):
self.agent = agent
self.cache = cache
async def process(self, request):
cache_key = hash(request)
if cached := await self.cache.get(cache_key):
return cached
result = await self.agent.process(request)
await self.cache.set(cache_key, result)
return result
2. Connection Pooling
class PooledAgent:
def __init__(self, agent_factory, pool_size=10):
self.pool = asyncio.Queue(maxsize=pool_size)
for _ in range(pool_size):
self.pool.put_nowait(agent_factory())
async def process(self, request):
agent = await self.pool.get()
try:
return await agent.process(request)
finally:
self.pool.put_nowait(agent)
3. Lazy Loading
class LazyAgent:
def __init__(self):
self._agent = None
self._skills = {}
@property
def agent(self):
if self._agent is None:
self._agent = self._create_agent()
return self._agent
async def process(self, request):
return await self.agent.process(request)
Key Takeaway: Start simple and evolve complexity as needed. Each pattern solves specific problems - understand your requirements before choosing an architecture. The best architecture is the simplest one that meets your needs.
Next: Compare the major frameworks that implement these patterns.