Build a RAG Pipeline

Goal: Build a Retrieval-Augmented Generation system from scratch — embed documents, search by similarity, generate answers with context. No vector database, just NumPy.

Prerequisites: Retrieval Augmented Generation, Embeddings, Text Preprocessing, Dot Product


Why RAG?

LLMs hallucinate. They can’t know about your private documents. RAG fixes this:

  1. Chunk your documents into passages
  2. Embed each chunk into a vector
  3. Retrieve the most relevant chunks for a query
  4. Generate an answer using retrieved context

Step 1: Document Chunking

import numpy as np
 
# Sample documents — pretend these are your knowledge base
documents = [
    """Neural networks consist of layers of interconnected neurons. Each neuron applies
    a weighted sum followed by a nonlinear activation function. The most common activation
    is ReLU, which outputs max(0, x). Deep networks have many layers, allowing them to
    learn hierarchical representations.""",
 
    """Backpropagation is the algorithm used to train neural networks. It computes gradients
    of the loss function with respect to each weight by applying the chain rule backwards
    through the network. The gradients are then used by an optimizer like SGD or Adam to
    update the weights.""",
 
    """Transformers replaced RNNs as the dominant architecture for sequence modeling. The key
    innovation is the self-attention mechanism, which allows each token to attend to all other
    tokens in parallel. This solves the long-range dependency problem that plagued RNNs.""",
 
    """Transfer learning involves using a model pretrained on a large dataset and fine-tuning
    it on a smaller target dataset. This is effective because early layers learn general features
    like edges and textures, while later layers learn task-specific features.""",
 
    """Gradient descent is an optimization algorithm that iteratively updates parameters in the
    direction that reduces the loss. Variants include SGD (stochastic, uses one sample), mini-batch
    (uses a subset), and Adam (adaptive learning rates with momentum). Learning rate is the most
    important hyperparameter.""",
 
    """Overfitting occurs when a model memorizes training data instead of learning general patterns.
    Signs include high training accuracy but low test accuracy. Solutions include regularization
    (L1/L2, dropout), early stopping, data augmentation, and using simpler models.""",
 
    """Convolutional neural networks use learned filters that slide across input images to detect
    features. Pooling layers reduce spatial dimensions. Modern architectures like ResNet use
    skip connections to train very deep networks (100+ layers) without gradient degradation.""",
 
    """Word embeddings map words to dense vectors where semantic similarity corresponds to vector
    proximity. Word2Vec learns embeddings by predicting context words. BERT produces contextual
    embeddings where the same word gets different vectors depending on context.""",
]
 
def chunk_text(text, chunk_size=200, overlap=50):
    """Split text into overlapping chunks by character count."""
    words = text.split()
    chunks = []
    i = 0
    while i < len(words):
        chunk_words = words[i:i + chunk_size]
        chunks.append(" ".join(chunk_words))
        i += chunk_size - overlap
    return chunks
 
# Chunk all documents
all_chunks = []
for doc in documents:
    chunks = chunk_text(doc, chunk_size=50, overlap=10)
    all_chunks.extend(chunks)
 
print(f"Documents: {len(documents)}")
print(f"Chunks: {len(all_chunks)}")
for i, chunk in enumerate(all_chunks[:3]):
    print(f"\nChunk {i}: {chunk[:100]}...")

Step 2: Embed with a Pretrained Model

from transformers import AutoTokenizer, AutoModel
import torch
 
model_name = "sentence-transformers/all-MiniLM-L6-v2"  # small, fast, good
tokenizer = AutoTokenizer.from_pretrained(model_name)
embed_model = AutoModel.from_pretrained(model_name)
embed_model.eval()
 
def embed_texts(texts, batch_size=16):
    """Embed a list of texts into vectors."""
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        tokens = tokenizer(batch, padding=True, truncation=True, max_length=256, return_tensors="pt")
        with torch.no_grad():
            output = embed_model(**tokens)
        # Mean pooling over token embeddings
        attention_mask = tokens["attention_mask"].unsqueeze(-1)
        embeddings = (output.last_hidden_state * attention_mask).sum(1) / attention_mask.sum(1)
        all_embeddings.append(embeddings.numpy())
    return np.vstack(all_embeddings)
 
# Embed all chunks
chunk_embeddings = embed_texts(all_chunks)
print(f"Embedding shape: {chunk_embeddings.shape}")  # (n_chunks, 384)

Step 3: Retrieval by Cosine Similarity

def cosine_similarity(a, b):
    """Cosine similarity between a vector and a matrix of vectors."""
    a_norm = a / (np.linalg.norm(a) + 1e-10)
    b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-10)
    return b_norm @ a_norm
 
def retrieve(query, chunks, chunk_embeddings, top_k=3):
    """Find the top-k most relevant chunks for a query."""
    query_embedding = embed_texts([query])[0]
    similarities = cosine_similarity(query_embedding, chunk_embeddings)
    top_indices = np.argsort(similarities)[::-1][:top_k]
 
    results = []
    for idx in top_indices:
        results.append({
            "chunk": chunks[idx],
            "score": similarities[idx],
        })
    return results
 
# Test retrieval
query = "How do you prevent overfitting?"
results = retrieve(query, all_chunks, chunk_embeddings, top_k=3)
 
print(f"Query: '{query}'\n")
for i, r in enumerate(results):
    print(f"Result {i+1} (score={r['score']:.3f}):")
    print(f"  {r['chunk'][:150]}...")
    print()

Step 4: Generate an Answer

Using a local model (if you have GPU)

from transformers import pipeline
 
# Use a small instruction-following model
generator = pipeline("text-generation", model="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
                     device=0 if torch.cuda.is_available() else -1,
                     max_new_tokens=200, do_sample=False)
 
def rag_answer(query, chunks, chunk_embeddings, top_k=3):
    """Full RAG pipeline: retrieve + generate."""
    # Retrieve
    results = retrieve(query, chunks, chunk_embeddings, top_k)
    context = "\n\n".join([r["chunk"] for r in results])
 
    # Generate
    prompt = f"""<|system|>
Answer the question using ONLY the provided context. If the context doesn't contain the answer, say "I don't have enough information."
</s>
<|user|>
Context:
{context}
 
Question: {query}
</s>
<|assistant|>
"""
    response = generator(prompt)[0]["generated_text"]
    # Extract just the assistant's response
    answer = response.split("<|assistant|>")[-1].strip()
    return answer, results
 
# Test
answer, sources = rag_answer("How do you prevent overfitting?", all_chunks, chunk_embeddings)
print(f"Answer: {answer}\n")
print("Sources:")
for s in sources:
    print(f"  [{s['score']:.3f}] {s['chunk'][:80]}...")

Alternative: API-based (if no local GPU)

def rag_answer_api(query, chunks, chunk_embeddings, top_k=3):
    """RAG with an API-based LLM (placeholder — replace with your API)."""
    results = retrieve(query, chunks, chunk_embeddings, top_k)
    context = "\n\n".join([r["chunk"] for r in results])
 
    prompt = f"""Answer the question using ONLY the provided context.
If the context doesn't contain the answer, say "I don't have enough information."
 
Context:
{context}
 
Question: {query}
 
Answer:"""
 
    # Replace this with your API call:
    # response = openai.chat.completions.create(...)
    # response = anthropic.messages.create(...)
    print("Prompt that would be sent to API:")
    print(prompt)
    return prompt, results

Evaluate Retrieval Quality

# Test queries with expected chunks
test_queries = [
    ("What is backpropagation?", "backprop"),
    ("How do transformers work?", "self-attention"),
    ("What is transfer learning?", "pretrained"),
    ("What are word embeddings?", "Word2Vec"),
    ("How does gradient descent work?", "optimization"),
]
 
print("Retrieval quality check:")
for query, expected_keyword in test_queries:
    results = retrieve(query, all_chunks, chunk_embeddings, top_k=1)
    top_chunk = results[0]["chunk"].lower()
    found = expected_keyword.lower() in top_chunk
    score = results[0]["score"]
    status = "OK" if found else "MISS"
    print(f"  [{status}] score={score:.3f} | {query}")

Visualize Embeddings

from sklearn.decomposition import PCA
 
# Embed queries and chunks together
queries = [q for q, _ in test_queries]
query_embeddings = embed_texts(queries)
 
# PCA to 2D
all_emb = np.vstack([chunk_embeddings, query_embeddings])
pca = PCA(n_components=2)
coords = pca.fit_transform(all_emb)
 
chunk_coords = coords[:len(all_chunks)]
query_coords = coords[len(all_chunks):]
 
plt.figure(figsize=(10, 7))
plt.scatter(chunk_coords[:, 0], chunk_coords[:, 1], c="steelblue", s=30, alpha=0.6, label="Chunks")
plt.scatter(query_coords[:, 0], query_coords[:, 1], c="red", s=100, marker="*", label="Queries")
for i, q in enumerate(queries):
    plt.annotate(q[:30] + "...", query_coords[i], fontsize=8, alpha=0.8)
plt.legend()
plt.title("Document chunks and queries in embedding space")
plt.show()

Improving the Pipeline

Better chunking: sentence-based

def sentence_chunk(text, max_sentences=3, overlap=1):
    """Split by sentences with overlap."""
    import re
    sentences = re.split(r'(?<=[.!?])\s+', text.strip())
    chunks = []
    for i in range(0, len(sentences), max_sentences - overlap):
        chunk = " ".join(sentences[i:i + max_sentences])
        if chunk:
            chunks.append(chunk)
    return chunks

Reranking: score chunks with a cross-encoder

# Cross-encoders are more accurate than bi-encoders for reranking
# pip install sentence-transformers
# from sentence_transformers import CrossEncoder
# reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
# scores = reranker.predict([(query, chunk) for chunk in top_chunks])

Hybrid search: combine keyword + semantic

def keyword_score(query, chunk):
    """Simple keyword overlap score."""
    query_words = set(query.lower().split())
    chunk_words = set(chunk.lower().split())
    overlap = query_words & chunk_words
    return len(overlap) / len(query_words) if query_words else 0
 
def hybrid_retrieve(query, chunks, chunk_embeddings, top_k=3, alpha=0.7):
    """Combine semantic and keyword search."""
    query_embedding = embed_texts([query])[0]
    semantic_scores = cosine_similarity(query_embedding, chunk_embeddings)
    keyword_scores = np.array([keyword_score(query, c) for c in chunks])
 
    # Normalize both to [0, 1]
    sem_norm = (semantic_scores - semantic_scores.min()) / (semantic_scores.max() - semantic_scores.min() + 1e-10)
    key_norm = (keyword_scores - keyword_scores.min()) / (keyword_scores.max() - keyword_scores.min() + 1e-10)
 
    combined = alpha * sem_norm + (1 - alpha) * key_norm
    top_indices = np.argsort(combined)[::-1][:top_k]
 
    return [{"chunk": chunks[i], "score": combined[i]} for i in top_indices]

Exercises

  1. Your own knowledge base: Replace the sample documents with your vault notes. Build a RAG system that answers questions about your ML study notes.

  2. Chunk size experiment: Try chunk sizes of 50, 100, 200, 500 words. Measure retrieval quality. Smaller chunks = more precise but less context. Larger = more context but more noise.

  3. Add metadata: Store which document each chunk came from. Return source document names alongside the answer.

  4. Conversation memory: Extend the prompt to include the last 3 Q&A pairs, so follow-up questions work (“Tell me more about that”).

  5. Evaluation framework: Create 20 question-answer pairs. For each, check if the retrieved chunks contain the answer (retrieval recall) and if the generated answer is correct (answer accuracy).


This completes the tutorial series. Each tutorial builds on the vault’s theoretical foundation and gives you working code to understand the algorithms. Go build things.