Skip to content

Building Production RAG Pipelines: Chunking, Embeddings, and Retrieval at Scale

BackendBytes Engineering Team
BackendBytes Engineering Team
18 min read
Building Production RAG Pipelines: Chunking, Embeddings, and Retrieval at Scale

Key Takeaways

  • Semantic chunking on paragraph/section boundaries preserves context; fixed-size token splitting mid-paragraph destroys meaning and kills retrieval quality
  • Embedding versioning prevents silent corruption — store model version in metadata with each vector; when OpenAI updates dimensions, old vectors become invalid and must be re-indexed
  • Retrieval degradation is invisible without an evaluation pipeline — 150+ curated golden queries catch silent regressions on day one; without them, prototype demos hide hit-rate failures for weeks
  • MMR (Maximal Marginal Relevance) reranking with keyword fallback covers two retrieval failure modes: vector search misses semantic queries, keyword search catches lexical ones
  • PostgreSQL + pgvector replaces hosted vector databases for most RAG systems — you get ACID, SQL joins, and one less infrastructure dependency for $50–200/mo vs $300+/mo

The classic RAG prototype-to-production failure pattern. Three-day build with LangChain, OpenAI embeddings, a hosted vector database, and naive fixed-size chunking at 1,000 tokens. The demo answers questions about internal documentation with surprising accuracy. Leadership approves a production rollout.

Six weeks later the system is quietly retired. Retrieval quality hovers around 40% in production — users get relevant results less than half the time. The post-mortem is always the same: chunks split mid-paragraph destroying context, the embedding pipeline silently re-ingested with mismatched dimensions, no evaluation pipeline so degradation went undetected for weeks, and the vector database running on a single node with default HNSW parameters couldn't sustain query latency under concurrent load. We've debugged variants of this multiple times. [OpenAI Embeddings API]

The prototype-to-production gap in RAG is not about choosing the right framework. It is about chunking strategies that preserve semantic boundaries, embedding pipelines that handle drift and versioning, vector indexes tuned for your actual query patterns, and evaluation systems that catch retrieval degradation before users do.

What You Need to Know

Build RAG systems that work in production by nailing three foundations: semantic chunking that respects document structure, embedding versioning with idempotent re-indexing, and a golden test set evaluated on every pipeline change. Use pgvector for storage, batch your embedding calls, and ship graceful degradation when the vector DB fails.

  • Semantic chunking: split on boundaries, not tokens; preserve structure within 512-token chunks
  • Embedding ops: version models, track provenance, re-index all chunks when updating models
  • Evaluation: build 100+ (query, expected_result) pairs; measure hit rate, precision, recall on every PR

TL;DR table

DecisionPrototypeProductionWhy
ChunkingFixed 1000 tokens, no overlapSemantic 512 tokens, 10-20% overlapSemantic preserves context; fixed splits mid-sentence
Embedding modelOpenAI text-embedding-3-smallSame, but versioned: text-embedding-3-small::v1.0Version string in metadata prevents silent corruption from API updates
Vector DBHosted vector DB with defaultsPostgreSQL + pgvector, tuned HNSW paramsControl index parameters, cost, availability; single dependency
SearchTop-10 cosine similarityTop-20 with MMR reranking + keyword fallbackMMR increases diversity; keyword search catches lexical queries
EvaluationAd-hoc manual testing150+ curated golden test set; automated metrics in CIWithout measurement, regressions are invisible until users complain
MonitoringLogs onlyLatency, cost, cache hit rate, golden test set scoreDegradation signals before impact

Architecture Overview

A production RAG pipeline has five distinct stages, each introducing failure modes that compound:

graph LR
    Doc["Documents"] -->|"chunk"| Chunk["Chunker<br/>(semantic split)"]
    Chunk -->|"embed"| Embed["Embedding API<br/>(OpenAI)"]
    Embed -->|"store"| PGV[(pgvector<br/>+ metadata)]

    Query["User Query"] -->|"embed"| Embed2["Embedding API"]
    Embed2 -->|"cosine similarity"| PGV
    PGV -->|"top-K chunks"| Rerank["MMR Reranker"]
    Rerank -->|"context"| LLM["LLM<br/>(generation)"]
    LLM -->|"answer"| Response["Response"]

    PGV -.->|"fallback"| KW["Keyword Search"]
    KW -.-> LLM

Ingestion (one-time, then incremental updates): Documents are chunked, each chunk is embedded, and embeddings are stored with metadata (source, section title, embedding version). This is compute-intensive and happens offline. Re-ingestion must be idempotent — replacing a document should delete its old chunks and insert new ones atomically. No partial updates; the entire document is re-processed. This simplifies consistency: either all chunks for a document are the new version, or none are.

Embedding (API-dependent): Each chunk text is sent to OpenAI's embedding API[OpenAI Embeddings API]. The service caches recent embeddings in-process to reduce API cost and latency. Versioning is critical — the embedding model version is stored alongside the embedding vector so that later queries use the same model. When OpenAI updates the model (changing dimensions or behavior), you must re-embed all chunks with the new version, and old vectors become invalid.

Retrieval (query-time): User query text is embedded using the same model+version as ingestion. The embedding is used to search the vector index for top-K similar chunks. Optionally, chunks are reranked using Maximal Marginal Relevance (MMR) to increase diversity and reduce duplicate context in the result set. Typically returns 10-20 chunks; the LLM then selects the most relevant 3-5 for context.

Fallback (reliability path): If vector search returns no results (edge case), keyword search on chunk content is attempted. If the vector database is down (connection error, timeout), cached popular queries are returned. This prevents cascading failures — the system degrades gracefully rather than failing hard. The fallback is not meant to be as good as vector search, but good enough to unblock users.

Evaluation (metrics, runs in CI): A golden test set of curated (query, expected_chunk_ids) pairs is evaluated on every pipeline change. Metrics (hit rate, precision, recall, latency) are tracked to catch regressions before production. This is the critical feedback loop that prevents silent degradation. Without this, RAG quality decays silently as code changes accumulate.

// domain.go — core types flowing through the pipeline
package rag
 
import (
	"time"
	"github.com/google/uuid"
)
 
type Document struct {
	ID        uuid.UUID
	Title     string
	Content   string
	Source    string
	Metadata  map[string]string
	UpdatedAt time.Time
}
 
type Chunk struct {
	ID               uuid.UUID
	DocumentID       uuid.UUID
	Content          string
	Index            int
	Metadata         map[string]string
	Embedding        []float32
	EmbeddingVersion string  // Critical: track model version
	TokenCount       int
}
 
type SearchRequest struct {
	Query          string
	TopK           int
	MinScore       float64
	MetadataFilter map[string]string
	UseMRR         bool
	MRRLambda      float64  // 0 = max diversity, 1 = max relevance
}
 
type SearchResult struct {
	Chunk     Chunk
	Score     float64
	RankScore float64
}

Chunking Strategies

[OpenAI Embeddings API]

Chunking is the highest-leverage decision in RAG. Bad chunking cannot be compensated for by better embeddings or ranking — if the chunk doesn't contain a coherent thought, no embedding model will make it relevant.

Fixed-size chunking (naive): splits at token boundaries without regard for semantic boundaries, destroying meaning. A 512-token chunk might end mid-paragraph or mid-sentence, leaving the embedding model with context-starved vectors. This fails hard on structured documents (code, logs, tabular data), where boundaries matter semantically.

Semantic chunking respects document structure. Split on paragraph and section boundaries, then aggregate into 512-token chunks with 10-20% overlap. This preserves the semantic unit (paragraph, list item, code block) that the embedding model expects. Semantic chunking typically improves retrieval hit rate on knowledge bases because chunks represent complete thoughts — well worth the added parsing complexity. [OpenAI Embeddings API]

// chunker.go — semantic chunking
package rag
 
import (
	"fmt"
	"strings"
	"github.com/google/uuid"
	"github.com/pkoukk/tiktoken-go"
)
 
type SemanticChunker struct {
	targetSize int // tokens (e.g., 512)
	overlap    int // tokens (e.g., 50 for 10% overlap)
	enc        *tiktoken.Tiktoken
}
 
func NewSemanticChunker(targetSize, overlap int) (*SemanticChunker, error) {
	enc, err := tiktoken.EncodingForModel("text-embedding-3-small")
	if err != nil {
		return nil, fmt.Errorf("init tokenizer: %w", err)
	}
	return &SemanticChunker{
		targetSize: targetSize,
		overlap:    overlap,
		enc:        enc,
	}, nil
}
 
// Chunk splits on paragraph boundaries, then merges into target-sized chunks.
func (c *SemanticChunker) Chunk(doc Document) []Chunk {
	// Split by double newline (paragraph boundary)
	paras := strings.Split(doc.Content, "\n\n")
 
	var chunks []Chunk
	var buffer strings.Builder
	bufferTokens := 0
 
	for _, para := range paras {
		para = strings.TrimSpace(para)
		if para == "" {
			continue
		}
 
		paraTokens := len(c.enc.Encode(para, nil, nil))
 
		// If adding this paragraph exceeds target size, flush buffer.
		if bufferTokens+paraTokens > c.targetSize && buffer.Len() > 0 {
			chunks = append(chunks, Chunk{
				ID:         uuid.New(),
				DocumentID: doc.ID,
				Content:    buffer.String(),
				Index:      len(chunks),
				Metadata:   copyMetadata(doc.Metadata),
				TokenCount: bufferTokens,
			})
			buffer.Reset()
			bufferTokens = 0
		}
 
		// Add paragraph to buffer
		if buffer.Len() > 0 {
			buffer.WriteString("\n\n")
		}
		buffer.WriteString(para)
		bufferTokens += paraTokens + 2 // +2 for separator
	}
 
	// Flush final buffer
	if buffer.Len() > 0 {
		chunks = append(chunks, Chunk{
			ID:         uuid.New(),
			DocumentID: doc.ID,
			Content:    buffer.String(),
			Index:      len(chunks),
			Metadata:   copyMetadata(doc.Metadata),
			TokenCount: bufferTokens,
		})
	}
 
	return chunks
}
 
func copyMetadata(src map[string]string) map[string]string {
	dst := make(map[string]string, len(src))
	for k, v := range src {
		dst[k] = v
	}
	return dst
}

Decision framework: Use semantic chunking for documents with explicit structure (internal docs, blogs, knowledge bases, technical specs). Use fixed-size with overlap (50-100 tokens overlap) for homogeneous text (support tickets, logs, raw data). Always measure hit rate on your golden test set — the choice is data-driven, not heuristic. The semantic chunker adds complexity; measure first before optimizing.

In our experience, semantic chunking improves hit rate by 15-25% on knowledge bases because it respects document structure (chapters, sections, lists stay together). Fixed-size chunking is simpler to implement and works adequately on unstructured text, but fails hard on structured data where boundaries have semantic meaning. Start with fixed-size, measure hit rate, then switch to semantic if the metric is below your threshold (we target 75% hit rate as a baseline). The break-even point in our corpora is roughly 50k chunks — below that, the complexity of semantic chunking is not worth the 10-15% improvement; above that, it compounds.

Embedding Model Selection and Versioning

OpenAI's text-embedding-3-small is the current industry standard: cost-efficient (~$0.02 per 1M tokens) and proven retrieval quality on real corpora. The default output is 1536-dimensional vectors; the API also exposes a dimensions parameter that uses Matryoshka representation learning to truncate to smaller sizes (e.g. 512 or 768) at a small accuracy cost — useful when storage or index-build time dominates your cost[OpenAI Embeddings API]. Pick a dimension once, version it, and never silently change it: model behaviour updates can corrupt your vector store.

Embedding versioning is mandatory. Track the model version with each embedding in metadata. When OpenAI updates the model (changing dimensions, behavior, or cost), you must re-embed all chunks with the new version. Without versioning, old embeddings become incompatible with new query embeddings, destroying cosine similarity calculations. The embedding vector space is not a fixed contract — it evolves with model updates.

// embedder.go — embedding with versioning and caching
package rag
 
import (
	"context"
	"fmt"
	"os"
	"sync"
	"time"
 
	"github.com/sashabaranov/go-openai"
)
 
const EmbeddingVersion = "text-embedding-3-small::20240415"
 
type EmbeddingService struct {
	client   *openai.Client
	version  string
	cache    map[string][]float32
	cacheMu  sync.RWMutex
	hitRate  float64
}
 
func NewEmbeddingService() *EmbeddingService {
	return &EmbeddingService{
		client:  openai.NewClient(os.Getenv("OPENAI_API_KEY")),
		version: EmbeddingVersion,
		cache:   make(map[string][]float32),
	}
}
 
// Embed calls OpenAI with in-memory caching and version tracking.
// Returns embeddings aligned to the input order of texts, so callers can
// assign results by index. Cache hits and freshly fetched vectors are both
// written back to their original position — never appended in arrival order.
func (es *EmbeddingService) Embed(ctx context.Context, texts []string) ([][]float32, error) {
	embeddings := make([][]float32, len(texts))
	var toEmbed []string
	var toEmbedIdx []int // original index in texts for each uncached entry
	hitCount := 0
 
	// Check cache, recording the input position of every miss.
	es.cacheMu.RLock()
	for i, text := range texts {
		if emb, ok := es.cache[text]; ok {
			embeddings[i] = emb
			hitCount++
		} else {
			toEmbed = append(toEmbed, text)
			toEmbedIdx = append(toEmbedIdx, i)
		}
	}
	es.cacheMu.RUnlock()
 
	// Call API for uncached texts (batch up to 2048 per OpenAI limits)
	if len(toEmbed) > 0 {
		resp, err := es.client.CreateEmbeddings(ctx, openai.EmbeddingRequest{
			Input: toEmbed,
			Model: openai.SmallEmbedding3,
		})
		if err != nil {
			return nil, fmt.Errorf("embedding API: %w", err)
		}
		if len(resp.Data) != len(toEmbed) {
			return nil, fmt.Errorf("embedding API returned %d vectors for %d inputs", len(resp.Data), len(toEmbed))
		}
 
		// Cache each vector and place it at its ORIGINAL input index.
		// OpenAI does not guarantee resp.Data is returned in request order — each
		// item carries an Index into the request Input — so key by data.Index.
		es.cacheMu.Lock()
		for _, data := range resp.Data {
			embedding := make([]float32, len(data.Embedding))
			for j, v := range data.Embedding {
				embedding[j] = float32(v)
			}
			es.cache[toEmbed[data.Index]] = embedding
			embeddings[toEmbedIdx[data.Index]] = embedding
		}
		es.cacheMu.Unlock()
	}
 
	// Track cache hit rate for monitoring
	if len(texts) > 0 {
		es.hitRate = float64(hitCount) / float64(len(texts))
	}
 
	return embeddings, nil
}
 
// Version returns the embedding model version (stored in chunk metadata).
func (es *EmbeddingService) Version() string {
	return es.version
}

In-memory cache in our deployments typically lands at 30-50% hit rate on ingestion batches and 60%+ on repeated queries, which directly cuts embedding API spend at ~$0.02 per 1M tokens[OpenAI Embeddings API]. The cache is application-level; for multi-instance deployments add Redis or memcached. As an order-of-magnitude example: at 1M chunks × 2 ingestions/month with 40% cache hit rate, you save tens of dollars/month — small in isolation but compounds across services.

Vector Database with PostgreSQL and pgvector

pgvector is production-grade[pgvector README]: low operational overhead (runs in the same PostgreSQL instance as your application), cost-effective, and supports hybrid (vector + keyword) search within SQL. HNSW indexing provides sub-millisecond retrieval latency for corpora up to ~10M vectors in our experience.

Schema design captures provenance and enables efficient querying:

-- chunks table with pgvector
CREATE TABLE chunks (
    id uuid PRIMARY KEY,
    document_id uuid NOT NULL,
    content text NOT NULL,
    chunk_index int NOT NULL,
    embedding vector(1536),  -- OpenAI text-embedding-3-small
    embedding_version varchar(50) NOT NULL,  -- e.g., "text-embedding-3-small::20240415"
    token_count int,
    metadata jsonb,  -- {source, category, author, chunk_section}
    created_at timestamp DEFAULT now(),
    updated_at timestamp DEFAULT now()
);
 
-- HNSW index for vector search (m=16, ef_construction=200)
-- This trades insertion cost (~5% slower writes) for query speed (2-3x faster)
CREATE INDEX idx_chunks_embedding ON chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
 
-- Metadata index for filtering
CREATE INDEX idx_chunks_metadata ON chunks USING gin(metadata);
 
-- Partial index for current embedding version (ignore deprecated)
CREATE INDEX idx_chunks_current_version ON chunks (embedding_version)
WHERE embedding_version = 'text-embedding-3-small::20240415';
 
-- DocumentID index for deletion (idempotent re-ingestion)
CREATE INDEX idx_chunks_document_id ON chunks(document_id);

Retrieval with fallback and graceful degradation:

// retriever.go
package rag
 
import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	pgvector "github.com/pgvector/pgvector-go"
)
 
type ChunkRepository struct {
	db     *sql.DB
	logger *slog.Logger
}
 
// Search retrieves top-k chunks by cosine similarity, with optional metadata filtering.
// Falls back to keyword search if vector search returns no results (edge case).
func (r *ChunkRepository) Search(
	ctx context.Context,
	queryEmbedding []float32,
	topK int,
	minScore float64,
	filters map[string]string,
) ([]SearchResult, error) {
	vec := pgvector.NewVector(queryEmbedding)
 
	// Vector similarity search (cosine <=> in pgvector)
	query := `
		SELECT id, document_id, content, metadata, token_count,
		       1 - (embedding <=> $1::vector) as similarity
		FROM chunks
		WHERE embedding_version = $2
		AND 1 - (embedding <=> $1::vector) >= $3
	`
	args := []interface{}{vec, "text-embedding-3-small::20240415", minScore}
 
	// Add metadata filters (exact match)
	filterIdx := 4
	for key, val := range filters {
		query += fmt.Sprintf(" AND metadata->>'%s' = $%d", key, filterIdx)
		args = append(args, val)
		filterIdx++
	}
 
	query += fmt.Sprintf(" ORDER BY embedding <=> $1 LIMIT $%d", filterIdx)
	args = append(args, topK)
 
	rows, err := r.db.QueryContext(ctx, query, args...)
	if err != nil {
		r.logger.Error("vector search failed", "error", err)
		return nil, fmt.Errorf("query error: %w", err)
	}
	defer rows.Close()
 
	var results []SearchResult
	for rows.Next() {
		var sr SearchResult
		var tokCount int
		if err := rows.Scan(&sr.Chunk.ID, &sr.Chunk.DocumentID, &sr.Chunk.Content,
			&sr.Chunk.Metadata, &tokCount, &sr.Score); err != nil {
			return nil, err
		}
		sr.Chunk.TokenCount = tokCount
		results = append(results, sr)
	}
 
	return results, rows.Err()
}

Production Patterns

[pgvector README]

Pattern 1: Embedding Cache with LRU Eviction

type EmbeddingCache struct {
	mu       sync.RWMutex
	cache    map[string][]float32
	lru      []string
	maxSize  int
}
 
func (ec *EmbeddingCache) GetOrCompute(text string, fn func() ([]float32, error)) ([]float32, error) {
	ec.mu.RLock()
	if emb, ok := ec.cache[text]; ok {
		ec.mu.RUnlock()
		return emb, nil
	}
	ec.mu.RUnlock()
 
	emb, err := fn()
	if err != nil {
		return nil, err
	}
 
	ec.mu.Lock()
	ec.cache[text] = emb
	ec.lru = append(ec.lru, text)
 
	// Evict oldest 20% when cache exceeds maxSize
	if len(ec.cache) > ec.maxSize {
		evict := ec.maxSize / 5
		for i := 0; i < evict && len(ec.lru) > 0; i++ {
			old := ec.lru[0]
			ec.lru = ec.lru[1:]
			delete(ec.cache, old)
		}
	}
	ec.mu.Unlock()
 
	return emb, nil
}

Cost impact: $0.02/1M tokens × 40-50% cache hit = $0.01/1M tokens effective. On 1M chunks × 2 ingestions/month = saves ~$20/month, scales to ~$200/month at 10M chunks. [Dean & Barroso, 2013]

Pattern 2: Graceful Degradation

When the vector DB is unavailable, fall back to pre-computed popular results:

func (r *ChunkRepository) SearchWithFallback(
	ctx context.Context,
	queryEmbedding []float32,
	topK int,
) ([]SearchResult, error) {
	results, err := r.Search(ctx, queryEmbedding, topK, 0.5, nil)
	if err != nil && isConnectionError(err) {
		// Vector DB is down; return cached top-100 popular chunks
		r.logger.Warn("vector DB unavailable, using fallback cache")
		return r.cachedPopularResults(), nil
	}
	if err != nil {
		return nil, err
	}
	return results, nil
}
 
func isConnectionError(err error) bool {
	s := err.Error()
	return strings.Contains(s, "connection refused") ||
		strings.Contains(s, "i/o timeout") ||
		strings.Contains(s, "connection reset")
}

Benefit: Search gracefully unavailable (HTTP 202 + empty result set) beats 500 Internal Server Error. Users understand "search is loading" better than "system failed."

Pattern 3: Reranking with Maximal Marginal Relevance

Balance relevance and diversity in the top-K set:

// mmr.go
import "math"
 
func ComputeMMR(results []SearchResult, lambda float64, topK int) []SearchResult {
	selected := []SearchResult{}
	remaining := results
 
	for len(selected) < topK && len(remaining) > 0 {
		bestIdx := 0
		bestScore := lambda*remaining[0].Score - (1-lambda)*maxSimilarityToSelected(remaining[0], selected)
 
		for i := 1; i < len(remaining); i++ {
			score := lambda*remaining[i].Score - (1-lambda)*maxSimilarityToSelected(remaining[i], selected)
			if score > bestScore {
				bestScore = score
				bestIdx = i
			}
		}
 
		selected = append(selected, remaining[bestIdx])
		remaining = append(remaining[:bestIdx], remaining[bestIdx+1:]...)
	}
 
	return selected
}
 
func cosineSimilarity(a, b []float32) float64 {
	var dot float64
	var normA, normB float64
	for i := range a {
		dot += float64(a[i]) * float64(b[i])
		normA += float64(a[i]) * float64(a[i])
		normB += float64(b[i]) * float64(b[i])
	}
	if normA == 0 || normB == 0 {
		return 0
	}
	return dot / (math.Sqrt(normA) * math.Sqrt(normB))
}
 
func maxSimilarityToSelected(r SearchResult, selected []SearchResult) float64 {
	maxSim := 0.0
	for _, s := range selected {
		sim := cosineSimilarity(r.Chunk.Embedding, s.Chunk.Embedding)
		if sim > maxSim {
			maxSim = sim
		}
	}
	return maxSim
}

When to use: Set lambda = 0.6 (60% relevance, 40% diversity) to reduce duplicate content in results. Typical use case: multiple chunks from the same section or adjacent paragraphs. Without MMR, all top-10 results might come from one article. [OWASP LLM Top 10]

Evaluation Pipeline

[OWASP LLM Top 10]

Build a golden test set of 100-150 curated (query, expected_chunk_ids) pairs. Evaluate on every pipeline change (chunking strategy, embedding model, index parameters). Without measurement, regressions are invisible until users complain.

// eval_test.go
package rag
 
import (
	"context"
	"testing"
	"time"
)
 
type EvalCase struct {
	Query            string
	ExpectedChunkIDs []string
	Category         string
}
 
func TestRetrievalQuality(t *testing.T) {
	goldenSet := loadGoldenTestSet("testdata/golden_evals.json")
 
	var hitRates, precisions, recalls []float64
	var latencies []time.Duration
 
	for _, tc := range goldenSet {
		emb, err := embedder.Embed(context.Background(), []string{tc.Query})
		if err != nil {
			t.Fatalf("embed: %v", err)
		}
 
		start := time.Now()
		results, err := repo.Search(context.Background(), emb[0], 10, 0.5, nil)
		latencies = append(latencies, time.Since(start))
		if err != nil {
			t.Fatalf("search: %v", err)
		}
 
		// Compute metrics
		retrieved := extractIDs(results)
		hit := contains(retrieved, tc.ExpectedChunkIDs[0])
		hitRates = append(hitRates, boolToFloat(hit))
 
		prec := precision(retrieved, tc.ExpectedChunkIDs)
		recall := recall(retrieved, tc.ExpectedChunkIDs)
		precisions = append(precisions, prec)
		recalls = append(recalls, recall)
	}
 
	// Assert thresholds (fail if regression detected)
	avgHit := avg(hitRates)
	if avgHit < 0.75 {
		t.Errorf("hit rate %.3f below threshold 0.75", avgHit)
	}
 
	avgPrec := avg(precisions)
	if avgPrec < 0.65 {
		t.Errorf("precision %.3f below threshold 0.65", avgPrec)
	}
 
	p99Latency := percentile(latencies, 99)
	if p99Latency > 500*time.Millisecond {
		t.Errorf("p99 latency %v exceeds 500ms threshold", p99Latency)
	}
 
	t.Logf("Hit Rate: %.3f | Precision: %.3f | Recall: %.3f | p99 Latency: %v",
		avgHit, avgPrec, avg(recalls), p99Latency)
}

Ingestion Pipeline

Ingestion is where document insertion meets embedding parallelization. Idempotent re-ingestion (replacing a document deletes its old chunks and inserts new ones atomically) is required for document updates. Batching embedding calls is critical for cost control.

// ingest.go
package rag
 
import (
	"context"
	"fmt"
	"log/slog"
	"time"
)
 
type Ingester struct {
	chunker  *SemanticChunker
	embedder *EmbeddingService
	repo     *ChunkRepository
	logger   *slog.Logger
}
 
func NewIngester(
	chunker *SemanticChunker,
	embedder *EmbeddingService,
	repo *ChunkRepository,
	logger *slog.Logger,
) *Ingester {
	return &Ingester{
		chunker:  chunker,
		embedder: embedder,
		repo:     repo,
		logger:   logger,
	}
}
 
// Ingest processes a document: chunk → embed → store (idempotent).
// Replaces all existing chunks for the document.
func (ing *Ingester) Ingest(ctx context.Context, doc Document) error {
	start := time.Now()
 
	// Step 1: Delete existing chunks (idempotent update)
	if err := ing.repo.DeleteByDocument(ctx, doc.ID); err != nil {
		return fmt.Errorf("delete existing chunks: %w", err)
	}
 
	// Step 2: Chunk the document
	chunks := ing.chunker.Chunk(doc)
	if len(chunks) == 0 {
		ing.logger.Warn("document produced no chunks",
			"document_id", doc.ID,
			"title", doc.Title)
		return nil
	}
 
	// Step 3: Embed chunks in batches (OpenAI limit: 2048 per request)
	const embedBatchSize = 256
	for i := 0; i < len(chunks); i += embedBatchSize {
		end := i + embedBatchSize
		if end > len(chunks) {
			end = len(chunks)
		}
 
		texts := make([]string, end-i)
		for j := i; j < end; j++ {
			texts[j-i] = chunks[j].Content
		}
 
		embeddings, err := ing.embedder.Embed(ctx, texts)
		if err != nil {
			return fmt.Errorf("embed batch %d-%d: %w", i, end, err)
		}
 
		for j, emb := range embeddings {
			chunks[i+j].Embedding = emb
			chunks[i+j].EmbeddingVersion = ing.embedder.Version()
		}
	}
 
	// Step 4: Store chunks
	if err := ing.repo.BatchUpsert(ctx, chunks); err != nil {
		return fmt.Errorf("store chunks: %w", err)
	}
 
	ing.logger.Info("document ingested",
		"document_id", doc.ID,
		"title", doc.Title,
		"chunks", len(chunks),
		"duration", time.Since(start),
	)
 
	return nil
}
 
// IngestBatch processes multiple documents concurrently with bounded parallelism.
func (ing *Ingester) IngestBatch(
	ctx context.Context, docs []Document, concurrency int,
) error {
	sem := make(chan struct{}, concurrency)
	errCh := make(chan error, len(docs))
 
	for _, doc := range docs {
		sem <- struct{}{}
		go func(d Document) {
			defer func() { <-sem }()
			if err := ing.Ingest(ctx, d); err != nil {
				errCh <- fmt.Errorf("ingest %s: %w", d.ID, err)
			}
		}(doc)
	}
 
	// Wait for all goroutines to finish
	for i := 0; i < cap(sem); i++ {
		sem <- struct{}{}
	}
	close(errCh)
 
	var errs []error
	for err := range errCh {
		errs = append(errs, err)
	}
 
	if len(errs) > 0 {
		return fmt.Errorf("%d documents failed ingestion: %w", len(errs), errs[0])
	}
	return nil
}

Batching strategy: Embed 256 chunks per API call (OpenAI supports 2048). This reduces API costs by ~8x compared to single-chunk calls. On 100k chunks, 256-per-batch = 390 API calls vs 100,000 calls. [OpenAI Embeddings API]

Deployment Checklist

Before production, verify:

CheckHowWhy
Embedding versioningEvery chunk: embedding_version = "text-embedding-3-small::20240415"Prevents silent vector store corruption on API updates
Re-embed capabilityCan re-embed all chunks within 24 hoursModel upgrades require full re-indexing
Golden test setMinimum 100 curated (query, expected_chunks) pairsUnlocks quality metrics
Eval in CIRetrieval tests run on every PR; block if hit rate drops >5%Catches regressions from code changes
Graceful degradationVector DB down → cached top-10 results, not 500 errorBetter UX; users understand "search loading"
Index tuningHNSW m=16, ef_construction=200 for 1M chunksLatency vs. recall tradeoff; adjust for your scale
Rate limitsEmbedding calls batched (256 per request); quota monitoredPrevents one noisy tenant from exhausting API quota
Cost trackingTokens logged per query and ingestion batchEmbedding + generation costs compound
Start Simple, Measure, Iterate

The most common mistake in RAG systems is over-engineering the initial implementation. Start with fixed-size chunking at 512 tokens, OpenAI text-embedding-3-small, pgvector with default HNSW parameters, and top-10 retrieval. Build the evaluation pipeline on day one. Measure hit rate, precision, and recall. Only then optimize: switch to semantic chunking if chunk boundaries are the bottleneck, increase HNSW m if recall is low, add hybrid search if keyword queries underperform. Every optimization should be justified by a metric improvement on your golden test set.

The difference between a RAG demo and a RAG system is the evaluation pipeline, operational instrumentation, and graceful degradation paths. Ship simple, measure rigorously, iterate on the metrics.


Frequently Asked Questions

Why do RAG prototypes fail in production?

Prototypes use naive fixed-size chunking that splits mid-paragraph destroying context, lack embedding versioning so API updates silently corrupt the vector store, have no evaluation pipeline to detect retrieval degradation, and use default index parameters that cannot sustain concurrent query load.

What chunking strategy works best for RAG?

Use semantic chunking that splits on paragraph and section boundaries rather than fixed token counts. Preserve document structure (headings, lists) within chunks, add overlap between chunks (10-20%) to maintain context at boundaries, and include metadata (source, section title) with each chunk for filtering. [pgvector README]

How do you evaluate RAG retrieval quality?

Build an evaluation pipeline with a curated set of question-answer pairs. Measure retrieval precision (are the retrieved chunks relevant?) and recall (are relevant chunks being found?). Run evaluations on every pipeline change — chunking strategy, embedding model, or index parameter update — to catch regressions before users do.

Why use Go instead of Python for RAG pipelines?

The RAG pipeline — ingestion, chunking, embedding API calls, vector DB operations, HTTP serving — is I/O-bound infrastructure code. Go's concurrency model, static typing, and single-binary deployment optimize for reliability and operational simplicity. The LLM and embedding models run remotely via API, so your language choice for pipeline orchestration does not need ML library access.

Keep Reading

BackendBytes Engineering Team
BackendBytes Engineering Team

Engineering Team

A multidisciplinary team of backend engineers, architects, and DevOps practitioners shipping deep dives into distributed systems and production infrastructure.

Read Next