Building a RAG System from Scratch: Complete Python Tutorial 2026

Build a production-quality Retrieval Augmented Generation system step by step, from document processing to API deployment

返回教程列表
高级45 分钟

Building a RAG System from Scratch: Complete Python Tutorial 2026

Build a production-quality Retrieval Augmented Generation system step by step, from document processing to API deployment

Complete hands-on tutorial for building a RAG (Retrieval Augmented Generation) system from scratch in Python. Covers document chunking, embedding generation, vector storage, retrieval optimization, reranking, and building a production API.

ragpythonvector-databaseopenaiproductiontutorial

Building a RAG System from Scratch: Complete Python Tutorial 2026

Retrieval Augmented Generation (RAG) is the most widely deployed LLM architecture pattern in enterprise AI. Instead of relying on an LLM's training knowledge, RAG systems retrieve relevant information from your own data at query time—enabling accurate, up-to-date answers with citations.

This tutorial builds a complete RAG system from the ground up.

Architecture Overview


[Documents] → [Chunker] → [Embedder] → [Vector DB]
                                              ↓
[User Query] → [Query Embedder] → [Retriever] → [Reranker] → [LLM] → [Answer]

Step 1: Document Processing

python
from pathlib import Path
from typing import List, Dict
import re

class DocumentProcessor: """Converts raw documents into text chunks suitable for embedding.""" def __init__(self, chunk_size: int = 512, chunk_overlap: int = 128): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def load_document(self, path: str) -> str: path = Path(path) if path.suffix == ".pdf": return self._load_pdf(path) elif path.suffix in [".md", ".txt"]: return path.read_text(encoding="utf-8") elif path.suffix == ".docx": return self._load_docx(path) else: raise ValueError(f"Unsupported format: {path.suffix}") def _load_pdf(self, path: Path) -> str: import pypdf reader = pypdf.PdfReader(str(path)) return "\n\n".join(page.extract_text() for page in reader.pages) def _load_docx(self, path: Path) -> str: import docx doc = docx.Document(str(path)) return "\n\n".join(para.text for para in doc.paragraphs if para.text.strip()) def clean_text(self, text: str) -> str: # Remove excessive whitespace text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r' {2,}', ' ', text) # Remove page numbers text = re.sub(r'\n\d+\n', '\n', text) return text.strip() def chunk_text(self, text: str, metadata: dict = None) -> List[Dict]: """Split text into overlapping chunks.""" text = self.clean_text(text) # Split on sentence boundaries sentences = re.split(r'(?<=[.!?])\s+', text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence_length = len(sentence.split()) if current_length + sentence_length > self.chunk_size and current_chunk: # Save current chunk chunk_text = " ".join(current_chunk) chunks.append({ "text": chunk_text, "chunk_index": len(chunks), **(metadata or {}) }) # Overlap: keep last N words overlap_text = " ".join(current_chunk[-20:]) current_chunk = overlap_text.split() current_length = len(current_chunk) current_chunk.extend(sentence.split()) current_length += sentence_length if current_chunk: chunks.append({ "text": " ".join(current_chunk), "chunk_index": len(chunks), **(metadata or {}) }) return chunks

Usage

processor = DocumentProcessor(chunk_size=400, chunk_overlap=80) doc_text = processor.load_document("company_handbook.pdf") chunks = processor.chunk_text(doc_text, metadata={"source": "handbook", "type": "policy"}) print(f"Generated {len(chunks)} chunks")

Step 2: Embedding Generation

python
from openai import OpenAI
import numpy as np
from typing import List
import time

class EmbeddingGenerator: def __init__(self, model: str = "text-embedding-3-small"): self.client = OpenAI() self.model = model # Dimensions: small=1536, large=3072 self.dimensions = 1536 if "small" in model else 3072 def embed_texts(self, texts: List[str], batch_size: int = 100) -> np.ndarray: """Generate embeddings with batching and retry logic.""" all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] batch = [t.replace("\n", " ") for t in batch] # Clean for embedding success = False for attempt in range(3): try: response = self.client.embeddings.create( input=batch, model=self.model ) embeddings = [item.embedding for item in response.data] all_embeddings.extend(embeddings) success = True break except Exception as e: if attempt < 2: time.sleep(2 ** attempt) else: raise print(f"Embedded {min(i + batch_size, len(texts))}/{len(texts)} texts") return np.array(all_embeddings)

embedder = EmbeddingGenerator(model="text-embedding-3-small")

Step 3: Vector Storage with Qdrant

python
from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue
)
import uuid

class VectorStore: def __init__(self, collection_name: str, embedding_dim: int = 1536): self.client = QdrantClient(host="localhost", port=6333) self.collection_name = collection_name self.embedding_dim = embedding_dim # Create collection if it doesn't exist collections = [c.name for c in self.client.get_collections().collections] if collection_name not in collections: self.client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=embedding_dim, distance=Distance.COSINE ) ) print(f"Created collection: {collection_name}") def add_chunks(self, chunks: List[Dict], embeddings: np.ndarray): points = [ PointStruct( id=str(uuid.uuid4()), vector=embedding.tolist(), payload=chunk ) for chunk, embedding in zip(chunks, embeddings) ] # Upsert in batches of 100 for i in range(0, len(points), 100): batch = points[i:i+100] self.client.upsert( collection_name=self.collection_name, points=batch ) print(f"Added {len(points)} chunks to vector store") def search(self, query_embedding: np.ndarray, limit: int = 10, filter_dict: dict = None): search_filter = None if filter_dict: conditions = [ FieldCondition(key=k, match=MatchValue(value=v)) for k, v in filter_dict.items() ] search_filter = Filter(must=conditions) results = self.client.search( collection_name=self.collection_name, query_vector=query_embedding.tolist(), query_filter=search_filter, limit=limit, with_payload=True ) return [ {"text": r.payload["text"], "score": r.score, **r.payload} for r in results ]

Step 4: Retrieval with Reranking

Reranking significantly improves RAG quality:

python
import cohere

class RAGRetriever: def __init__(self, vector_store: VectorStore, embedder: EmbeddingGenerator): self.vector_store = vector_store self.embedder = embedder self.cohere_client = cohere.Client("your-cohere-api-key") def retrieve(self, query: str, n_retrieve: int = 20, n_final: int = 5, filter_dict: dict = None): # Step 1: Embed query query_embedding = self.embedder.embed_texts([query])[0] # Step 2: Retrieve candidates (retrieve more than needed) candidates = self.vector_store.search( query_embedding=query_embedding, limit=n_retrieve, filter_dict=filter_dict ) if not candidates: return [] # Step 3: Rerank with Cohere (dramatically improves relevance) rerank_results = self.cohere_client.rerank( model="rerank-english-v3.0", query=query, documents=[c["text"] for c in candidates], top_n=n_final ) reranked = [ {**candidates[r.index], "rerank_score": r.relevance_score} for r in rerank_results.results ] return reranked

Step 5: Generation with Citations

python
from openai import OpenAI

class RAGGenerator: def __init__(self, model: str = "gpt-4o"): self.client = OpenAI() self.model = model def generate(self, query: str, context_chunks: List[Dict]) -> Dict: # Format context with source labels context_parts = [] for i, chunk in enumerate(context_chunks, 1): source = chunk.get('source', 'Unknown') context_parts.append(f"[{i}] Source: {source}\n{chunk['text']}") context = "\n\n".join(context_parts) response = self.client.chat.completions.create( model=self.model, messages=[ { "role": "system", "content": """You are a helpful assistant that answers questions based on provided context. Always cite your sources using [number] notation. If the context doesn't contain enough information, say so clearly. Never make up information not present in the context.""" }, { "role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}" } ], temperature=0 ) answer = response.choices[0].message.content return { "answer": answer, "sources": [ {"index": i+1, "source": c.get("source"), "excerpt": c["text"][:200]} for i, c in enumerate(context_chunks) ], "tokens_used": response.usage.total_tokens }

Complete RAG pipeline

class RAGPipeline: def __init__(self, collection_name: str): self.embedder = EmbeddingGenerator() self.vector_store = VectorStore(collection_name) self.retriever = RAGRetriever(self.vector_store, self.embedder) self.generator = RAGGenerator() def index_document(self, file_path: str): processor = DocumentProcessor() text = processor.load_document(file_path) chunks = processor.chunk_text(text, metadata={"source": Path(file_path).name}) embeddings = self.embedder.embed_texts([c["text"] for c in chunks]) self.vector_store.add_chunks(chunks, embeddings) def query(self, question: str) -> Dict: context = self.retriever.retrieve(question) return self.generator.generate(question, context)

Deploy as FastAPI

from fastapi import FastAPI from pydantic import BaseModel

app = FastAPI() rag = RAGPipeline("company_knowledge_base")

class QueryRequest(BaseModel): question: str

@app.post("/query") async def query(request: QueryRequest): result = rag.query(request.question) return result

@app.post("/index") async def index(file_path: str): rag.index_document(file_path) return {"status": "indexed"}

Evaluation and Quality Metrics

python
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision

Evaluate RAG quality

test_cases = [ {"question": "What is the refund policy?", "ground_truth": "..."}, {"question": "How long is the warranty?", "ground_truth": "..."} ]

results = evaluate( dataset=test_dataset, metrics=[faithfulness, answer_relevancy, context_precision] ) print(results)

Key Performance Tips

  • Chunk size matters: 256-512 tokens for factual Q&A, 1024+ for summaries
  • Always rerank: Cohere reranker improves precision by 15-25%
  • Hybrid search: Combine vector + BM25 keyword search for best results
  • Chunk overlap: 15-20% overlap prevents cutting important context at boundaries
  • Metadata filtering: Filter by date, source, category before vector search
  • Conclusion

    A production RAG system requires careful attention to each component—document processing, chunking strategy, embedding quality, retrieval precision, and generation quality. The pipeline above gives you a solid foundation that you can extend with advanced features like hybrid search, query expansion, and citation verification.

    相关工具

    openaiqdrantcoherepython