AI System Design Patterns 2026: Rate Limiting, Caching, Fallbacks
Production patterns for reliable, cost-efficient AI applications
AI System Design Patterns 2026: Rate Limiting, Caching, Fallbacks
Production patterns for reliable, cost-efficient AI applications
Essential system design patterns for production AI applications: token budgeting, response caching, fallback chains, circuit breakers, and monitoring. Reduce costs 60-80% while improving reliability.
AI System Design Patterns 2026: Building Reliable LLM Applications
Production AI applications require more than API calls. Here are the essential patterns for building reliable, cost-efficient systems.
1. Response Caching
Cache identical or similar queries to cut costs dramatically:
python
import hashlib
import json
import redis
from anthropic import Anthropicclass CachedLLM:
def __init__(self):
self.client = Anthropic()
self.cache = redis.Redis(host='localhost', port=6379, db=0)
self.ttl = 3600 # 1 hour cache
def _cache_key(self, messages: list, model: str) -> str:
content = json.dumps({'model': model, 'messages': messages}, sort_keys=True)
return f'llm:{hashlib.sha256(content.encode()).hexdigest()}'
def complete(self, messages: list, model: str = 'claude-sonnet-4-5', **kwargs) -> str:
key = self._cache_key(messages, model)
# Check cache
cached = self.cache.get(key)
if cached:
return json.loads(cached)['response']
# Call API
response = self.client.messages.create(
model=model,
max_tokens=4096,
messages=messages,
**kwargs
)
result = response.content[0].text
# Store in cache
self.cache.setex(
key,
self.ttl,
json.dumps({'response': result, 'model': model})
)
return result
2. Fallback Chain
Gracefully degrade when a provider fails:
python
from openai import OpenAI
from anthropic import Anthropic
import google.generativeai as genai
from typing import Optionalclass FallbackLLM:
def __init__(self):
self.providers = [
('claude-sonnet-4-5', self._call_anthropic),
('gpt-5', self._call_openai),
('gemini-2.5-pro', self._call_gemini),
]
def _call_anthropic(self, messages: list, model: str) -> str:
client = Anthropic()
resp = client.messages.create(model=model, max_tokens=4096, messages=messages)
return resp.content[0].text
def _call_openai(self, messages: list, model: str) -> str:
client = OpenAI()
resp = client.chat.completions.create(model=model, messages=messages)
return resp.choices[0].message.content
def _call_gemini(self, messages: list, model: str) -> str:
genai.configure(api_key=GOOGLE_API_KEY)
m = genai.GenerativeModel(model)
user_msg = messages[-1]['content'] if messages else ''
return m.generate_content(user_msg).text
def complete(self, messages: list) -> Optional[str]:
last_error = None
for model, provider_fn in self.providers:
try:
return provider_fn(messages, model)
except Exception as e:
print(f'Provider {model} failed: {e}')
last_error = e
raise Exception(f'All providers failed. Last error: {last_error}')
3. Token Budget Management
python
import tiktoken
from typing import Listclass TokenBudgetManager:
def __init__(self, max_context_tokens: int = 100_000):
self.encoder = tiktoken.get_encoding('cl100k_base')
self.max_tokens = max_context_tokens
def count_tokens(self, text: str) -> int:
return len(self.encoder.encode(text))
def truncate_messages(self, messages: List[dict], system: str = '') -> List[dict]:
system_tokens = self.count_tokens(system)
budget = self.max_tokens - system_tokens - 500 # Reserve for response
# Always keep the last message (current query)
if not messages:
return messages
last_msg = messages[-1]
last_tokens = self.count_tokens(last_msg['content'])
remaining = budget - last_tokens
# Fill with history from newest to oldest
truncated = []
for msg in reversed(messages[:-1]):
msg_tokens = self.count_tokens(msg['content'])
if remaining >= msg_tokens:
truncated.insert(0, msg)
remaining -= msg_tokens
else:
break # Stop adding history
truncated.append(last_msg)
return truncated
4. Circuit Breaker
python
import time
from enum import Enumclass CircuitState(Enum):
CLOSED = 'closed' # Normal operation
OPEN = 'open' # Failing, reject calls
HALF_OPEN = 'half_open' # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failures = 0
self.threshold = failure_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception('Circuit breaker is OPEN - skipping call')
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failures = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.threshold:
self.state = CircuitState.OPEN
5. Cost Monitoring
python
from dataclasses import dataclass, field
from typing import Dict@dataclass
class UsageTracker:
costs: Dict[str, float] = field(default_factory=dict)
total_tokens: int = 0
PRICING = {
'claude-sonnet-4-5': {'input': 3.0, 'output': 15.0},
'gpt-5': {'input': 10.0, 'output': 30.0},
'gpt-5-mini': {'input': 0.40, 'output': 1.60},
}
def record(self, model: str, input_tokens: int, output_tokens: int):
pricing = self.PRICING.get(model, {'input': 10, 'output': 30})
cost = (input_tokens * pricing['input'] + output_tokens * pricing['output']) / 1_000_000
self.costs[model] = self.costs.get(model, 0) + cost
self.total_tokens += input_tokens + output_tokens
def report(self):
total = sum(self.costs.values())
print(f'Total API cost: ${total:.4f}')
for model, cost in sorted(self.costs.items(), key=lambda x: -x[1]):
print(f' {model}: ${cost:.4f}')
tracker = UsageTracker()
6. Prompt Versioning
python
import json
from pathlib import Pathclass PromptRegistry:
def __init__(self, prompts_dir: str = './prompts'):
self.dir = Path(prompts_dir)
self.dir.mkdir(exist_ok=True)
def save(self, name: str, prompt: str, version: str, metadata: dict = None):
data = {'prompt': prompt, 'version': version, 'metadata': metadata or {}}
path = self.dir / f'{name}_v{version}.json'
path.write_text(json.dumps(data, indent=2))
def load(self, name: str, version: str = 'latest') -> str:
if version == 'latest':
files = sorted(self.dir.glob(f'{name}_v*.json'))
if not files:
raise FileNotFoundError(f'No prompts found for {name}')
path = files[-1]
else:
path = self.dir / f'{name}_v{version}.json'
return json.loads(path.read_text())['prompt']
Usage
registry = PromptRegistry()
registry.save('rag_query', 'Answer based only on: {context}\n\nQ: {question}', '1.2')
prompt = registry.load('rag_query', 'latest')
Conclusion
Production AI applications require caching (reduce costs 60-80%), fallback chains (99.9% uptime), circuit breakers (prevent cascade failures), and usage tracking (control spend). These patterns turn a prototype into a production system.
相关工具
相关教程
Build complex multi-step AI workflows with state management using LangGraph
Chain-of-thought, tree-of-thoughts, self-consistency, and systematic evaluation methods
Deploy Llama 3 with 20x higher throughput than naive serving