Building a RAG System from Scratch: Complete Python Tutorial 2026
Build a production-quality Retrieval Augmented Generation system step by step, from document processing to API deployment
Building a RAG System from Scratch: Complete Python Tutorial 2026
Build a production-quality Retrieval Augmented Generation system step by step, from document processing to API deployment
Complete hands-on tutorial for building a RAG (Retrieval Augmented Generation) system from scratch in Python. Covers document chunking, embedding generation, vector storage, retrieval optimization, reranking, and building a production API.
Building a RAG System from Scratch: Complete Python Tutorial 2026
Retrieval Augmented Generation (RAG) is the most widely deployed LLM architecture pattern in enterprise AI. Instead of relying on an LLM's training knowledge, RAG systems retrieve relevant information from your own data at query time—enabling accurate, up-to-date answers with citations.
This tutorial builds a complete RAG system from the ground up.
Architecture Overview
[Documents] → [Chunker] → [Embedder] → [Vector DB]
↓
[User Query] → [Query Embedder] → [Retriever] → [Reranker] → [LLM] → [Answer]
Step 1: Document Processing
python
from pathlib import Path
from typing import List, Dict
import reclass DocumentProcessor:
"""Converts raw documents into text chunks suitable for embedding."""
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 128):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def load_document(self, path: str) -> str:
path = Path(path)
if path.suffix == ".pdf":
return self._load_pdf(path)
elif path.suffix in [".md", ".txt"]:
return path.read_text(encoding="utf-8")
elif path.suffix == ".docx":
return self._load_docx(path)
else:
raise ValueError(f"Unsupported format: {path.suffix}")
def _load_pdf(self, path: Path) -> str:
import pypdf
reader = pypdf.PdfReader(str(path))
return "\n\n".join(page.extract_text() for page in reader.pages)
def _load_docx(self, path: Path) -> str:
import docx
doc = docx.Document(str(path))
return "\n\n".join(para.text for para in doc.paragraphs if para.text.strip())
def clean_text(self, text: str) -> str:
# Remove excessive whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
# Remove page numbers
text = re.sub(r'\n\d+\n', '\n', text)
return text.strip()
def chunk_text(self, text: str, metadata: dict = None) -> List[Dict]:
"""Split text into overlapping chunks."""
text = self.clean_text(text)
# Split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence_length = len(sentence.split())
if current_length + sentence_length > self.chunk_size and current_chunk:
# Save current chunk
chunk_text = " ".join(current_chunk)
chunks.append({
"text": chunk_text,
"chunk_index": len(chunks),
**(metadata or {})
})
# Overlap: keep last N words
overlap_text = " ".join(current_chunk[-20:])
current_chunk = overlap_text.split()
current_length = len(current_chunk)
current_chunk.extend(sentence.split())
current_length += sentence_length
if current_chunk:
chunks.append({
"text": " ".join(current_chunk),
"chunk_index": len(chunks),
**(metadata or {})
})
return chunks
Usage
processor = DocumentProcessor(chunk_size=400, chunk_overlap=80)
doc_text = processor.load_document("company_handbook.pdf")
chunks = processor.chunk_text(doc_text, metadata={"source": "handbook", "type": "policy"})
print(f"Generated {len(chunks)} chunks")
Step 2: Embedding Generation
python
from openai import OpenAI
import numpy as np
from typing import List
import timeclass EmbeddingGenerator:
def __init__(self, model: str = "text-embedding-3-small"):
self.client = OpenAI()
self.model = model
# Dimensions: small=1536, large=3072
self.dimensions = 1536 if "small" in model else 3072
def embed_texts(self, texts: List[str], batch_size: int = 100) -> np.ndarray:
"""Generate embeddings with batching and retry logic."""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
batch = [t.replace("\n", " ") for t in batch] # Clean for embedding
success = False
for attempt in range(3):
try:
response = self.client.embeddings.create(
input=batch,
model=self.model
)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
success = True
break
except Exception as e:
if attempt < 2:
time.sleep(2 ** attempt)
else:
raise
print(f"Embedded {min(i + batch_size, len(texts))}/{len(texts)} texts")
return np.array(all_embeddings)
embedder = EmbeddingGenerator(model="text-embedding-3-small")
Step 3: Vector Storage with Qdrant
python
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue
)
import uuidclass VectorStore:
def __init__(self, collection_name: str, embedding_dim: int = 1536):
self.client = QdrantClient(host="localhost", port=6333)
self.collection_name = collection_name
self.embedding_dim = embedding_dim
# Create collection if it doesn't exist
collections = [c.name for c in self.client.get_collections().collections]
if collection_name not in collections:
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=embedding_dim,
distance=Distance.COSINE
)
)
print(f"Created collection: {collection_name}")
def add_chunks(self, chunks: List[Dict], embeddings: np.ndarray):
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding.tolist(),
payload=chunk
)
for chunk, embedding in zip(chunks, embeddings)
]
# Upsert in batches of 100
for i in range(0, len(points), 100):
batch = points[i:i+100]
self.client.upsert(
collection_name=self.collection_name,
points=batch
)
print(f"Added {len(points)} chunks to vector store")
def search(self, query_embedding: np.ndarray, limit: int = 10, filter_dict: dict = None):
search_filter = None
if filter_dict:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filter_dict.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding.tolist(),
query_filter=search_filter,
limit=limit,
with_payload=True
)
return [
{"text": r.payload["text"], "score": r.score, **r.payload}
for r in results
]
Step 4: Retrieval with Reranking
Reranking significantly improves RAG quality:
python
import cohereclass RAGRetriever:
def __init__(self, vector_store: VectorStore, embedder: EmbeddingGenerator):
self.vector_store = vector_store
self.embedder = embedder
self.cohere_client = cohere.Client("your-cohere-api-key")
def retrieve(self, query: str, n_retrieve: int = 20, n_final: int = 5, filter_dict: dict = None):
# Step 1: Embed query
query_embedding = self.embedder.embed_texts([query])[0]
# Step 2: Retrieve candidates (retrieve more than needed)
candidates = self.vector_store.search(
query_embedding=query_embedding,
limit=n_retrieve,
filter_dict=filter_dict
)
if not candidates:
return []
# Step 3: Rerank with Cohere (dramatically improves relevance)
rerank_results = self.cohere_client.rerank(
model="rerank-english-v3.0",
query=query,
documents=[c["text"] for c in candidates],
top_n=n_final
)
reranked = [
{**candidates[r.index], "rerank_score": r.relevance_score}
for r in rerank_results.results
]
return reranked
Step 5: Generation with Citations
python
from openai import OpenAIclass RAGGenerator:
def __init__(self, model: str = "gpt-4o"):
self.client = OpenAI()
self.model = model
def generate(self, query: str, context_chunks: List[Dict]) -> Dict:
# Format context with source labels
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source = chunk.get('source', 'Unknown')
context_parts.append(f"[{i}] Source: {source}\n{chunk['text']}")
context = "\n\n".join(context_parts)
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": """You are a helpful assistant that answers questions based on provided context.
Always cite your sources using [number] notation.
If the context doesn't contain enough information, say so clearly.
Never make up information not present in the context."""
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {query}"
}
],
temperature=0
)
answer = response.choices[0].message.content
return {
"answer": answer,
"sources": [
{"index": i+1, "source": c.get("source"), "excerpt": c["text"][:200]}
for i, c in enumerate(context_chunks)
],
"tokens_used": response.usage.total_tokens
}
Complete RAG pipeline
class RAGPipeline:
def __init__(self, collection_name: str):
self.embedder = EmbeddingGenerator()
self.vector_store = VectorStore(collection_name)
self.retriever = RAGRetriever(self.vector_store, self.embedder)
self.generator = RAGGenerator()
def index_document(self, file_path: str):
processor = DocumentProcessor()
text = processor.load_document(file_path)
chunks = processor.chunk_text(text, metadata={"source": Path(file_path).name})
embeddings = self.embedder.embed_texts([c["text"] for c in chunks])
self.vector_store.add_chunks(chunks, embeddings)
def query(self, question: str) -> Dict:
context = self.retriever.retrieve(question)
return self.generator.generate(question, context)Deploy as FastAPI
from fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI()
rag = RAGPipeline("company_knowledge_base")
class QueryRequest(BaseModel):
question: str
@app.post("/query")
async def query(request: QueryRequest):
result = rag.query(request.question)
return result
@app.post("/index")
async def index(file_path: str):
rag.index_document(file_path)
return {"status": "indexed"}
Evaluation and Quality Metrics
python
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precisionEvaluate RAG quality
test_cases = [
{"question": "What is the refund policy?", "ground_truth": "..."},
{"question": "How long is the warranty?", "ground_truth": "..."}
]results = evaluate(
dataset=test_dataset,
metrics=[faithfulness, answer_relevancy, context_precision]
)
print(results)
Key Performance Tips
Conclusion
A production RAG system requires careful attention to each component—document processing, chunking strategy, embedding quality, retrieval precision, and generation quality. The pipeline above gives you a solid foundation that you can extend with advanced features like hybrid search, query expansion, and citation verification.
相关工具
相关教程
Automatically classify, summarize, and draft replies to emails using AI
Build voice AI applications with natural-sounding TTS and custom voice cloning
Transcribe audio files, meetings, and real-time speech with Whisper