Building Production RAG Pipelines: Chunking, Embeddings, and Retrieval at Scale
Key Takeaways
- →Semantic chunking on paragraph/section boundaries preserves context; fixed-size token splitting mid-paragraph destroys meaning and kills retrieval quality
- →Embedding versioning prevents silent corruption — store model version in metadata with each vector; when OpenAI updates dimensions, old vectors become invalid and must be re-indexed
- →Retrieval degradation is invisible without an evaluation pipeline — 150+ curated golden queries catch silent regressions on day one; without them, prototype demos hide hit-rate failures for weeks
- →MMR (Maximal Marginal Relevance) reranking with keyword fallback covers two retrieval failure modes: vector search misses semantic queries, keyword search catches lexical ones
- →PostgreSQL + pgvector replaces hosted vector databases for most RAG systems — you get ACID, SQL joins, and one less infrastructure dependency for $50–200/mo vs $300+/mo
The classic RAG prototype-to-production failure pattern. Three-day build with LangChain, OpenAI embeddings, a hosted vector database, and naive fixed-size chunking at 1,000 tokens. The demo answers questions about internal documentation with surprising accuracy. Leadership approves a production rollout.
Six weeks later the system is quietly retired. Retrieval quality hovers around 40% in production — users get relevant results less than half the time. The post-mortem is always the same: chunks split mid-paragraph destroying context, the embedding pipeline silently re-ingested with mismatched dimensions, no evaluation pipeline so degradation went undetected for weeks, and the vector database running on a single node with default HNSW parameters couldn't sustain query latency under concurrent load. We've debugged variants of this multiple times. [OpenAI Embeddings API]
The prototype-to-production gap in RAG is not about choosing the right framework. It is about chunking strategies that preserve semantic boundaries, embedding pipelines that handle drift and versioning, vector indexes tuned for your actual query patterns, and evaluation systems that catch retrieval degradation before users do.
Build RAG systems that work in production by nailing three foundations: semantic chunking that respects document structure, embedding versioning with idempotent re-indexing, and a golden test set evaluated on every pipeline change. Use pgvector for storage, batch your embedding calls, and ship graceful degradation when the vector DB fails.
- Semantic chunking: split on boundaries, not tokens; preserve structure within 512-token chunks
- Embedding ops: version models, track provenance, re-index all chunks when updating models
- Evaluation: build 100+ (query, expected_result) pairs; measure hit rate, precision, recall on every PR
TL;DR table
| Decision | Prototype | Production | Why |
|---|---|---|---|
| Chunking | Fixed 1000 tokens, no overlap | Semantic 512 tokens, 10-20% overlap | Semantic preserves context; fixed splits mid-sentence |
| Embedding model | OpenAI text-embedding-3-small | Same, but versioned: text-embedding-3-small::v1.0 | Version string in metadata prevents silent corruption from API updates |
| Vector DB | Hosted vector DB with defaults | PostgreSQL + pgvector, tuned HNSW params | Control index parameters, cost, availability; single dependency |
| Search | Top-10 cosine similarity | Top-20 with MMR reranking + keyword fallback | MMR increases diversity; keyword search catches lexical queries |
| Evaluation | Ad-hoc manual testing | 150+ curated golden test set; automated metrics in CI | Without measurement, regressions are invisible until users complain |
| Monitoring | Logs only | Latency, cost, cache hit rate, golden test set score | Degradation signals before impact |
Architecture Overview
A production RAG pipeline has five distinct stages, each introducing failure modes that compound:
graph LR
Doc["Documents"] -->|"chunk"| Chunk["Chunker<br/>(semantic split)"]
Chunk -->|"embed"| Embed["Embedding API<br/>(OpenAI)"]
Embed -->|"store"| PGV[(pgvector<br/>+ metadata)]
Query["User Query"] -->|"embed"| Embed2["Embedding API"]
Embed2 -->|"cosine similarity"| PGV
PGV -->|"top-K chunks"| Rerank["MMR Reranker"]
Rerank -->|"context"| LLM["LLM<br/>(generation)"]
LLM -->|"answer"| Response["Response"]
PGV -.->|"fallback"| KW["Keyword Search"]
KW -.-> LLM
Ingestion (one-time, then incremental updates): Documents are chunked, each chunk is embedded, and embeddings are stored with metadata (source, section title, embedding version). This is compute-intensive and happens offline. Re-ingestion must be idempotent — replacing a document should delete its old chunks and insert new ones atomically. No partial updates; the entire document is re-processed. This simplifies consistency: either all chunks for a document are the new version, or none are.
Embedding (API-dependent): Each chunk text is sent to OpenAI's embedding API[OpenAI Embeddings API]. The service caches recent embeddings in-process to reduce API cost and latency. Versioning is critical — the embedding model version is stored alongside the embedding vector so that later queries use the same model. When OpenAI updates the model (changing dimensions or behavior), you must re-embed all chunks with the new version, and old vectors become invalid.
Retrieval (query-time): User query text is embedded using the same model+version as ingestion. The embedding is used to search the vector index for top-K similar chunks. Optionally, chunks are reranked using Maximal Marginal Relevance (MMR) to increase diversity and reduce duplicate context in the result set. Typically returns 10-20 chunks; the LLM then selects the most relevant 3-5 for context.
Fallback (reliability path): If vector search returns no results (edge case), keyword search on chunk content is attempted. If the vector database is down (connection error, timeout), cached popular queries are returned. This prevents cascading failures — the system degrades gracefully rather than failing hard. The fallback is not meant to be as good as vector search, but good enough to unblock users.
Evaluation (metrics, runs in CI): A golden test set of curated (query, expected_chunk_ids) pairs is evaluated on every pipeline change. Metrics (hit rate, precision, recall, latency) are tracked to catch regressions before production. This is the critical feedback loop that prevents silent degradation. Without this, RAG quality decays silently as code changes accumulate.
// domain.go — core types flowing through the pipeline
package rag
import (
"time"
"github.com/google/uuid"
)
type Document struct {
ID uuid.UUID
Title string
Content string
Source string
Metadata map[string]string
UpdatedAt time.Time
}
type Chunk struct {
ID uuid.UUID
DocumentID uuid.UUID
Content string
Index int
Metadata map[string]string
Embedding []float32
EmbeddingVersion string // Critical: track model version
TokenCount int
}
type SearchRequest struct {
Query string
TopK int
MinScore float64
MetadataFilter map[string]string
UseMRR bool
MRRLambda float64 // 0 = max diversity, 1 = max relevance
}
type SearchResult struct {
Chunk Chunk
Score float64
RankScore float64
}Chunking Strategies
[OpenAI Embeddings API]Chunking is the highest-leverage decision in RAG. Bad chunking cannot be compensated for by better embeddings or ranking — if the chunk doesn't contain a coherent thought, no embedding model will make it relevant.
Fixed-size chunking (naive): splits at token boundaries without regard for semantic boundaries, destroying meaning. A 512-token chunk might end mid-paragraph or mid-sentence, leaving the embedding model with context-starved vectors. This fails hard on structured documents (code, logs, tabular data), where boundaries matter semantically.
Semantic chunking respects document structure. Split on paragraph and section boundaries, then aggregate into 512-token chunks with 10-20% overlap. This preserves the semantic unit (paragraph, list item, code block) that the embedding model expects. Semantic chunking typically improves retrieval hit rate on knowledge bases because chunks represent complete thoughts — well worth the added parsing complexity. [OpenAI Embeddings API]
// chunker.go — semantic chunking
package rag
import (
"fmt"
"strings"
"github.com/google/uuid"
"github.com/pkoukk/tiktoken-go"
)
type SemanticChunker struct {
targetSize int // tokens (e.g., 512)
overlap int // tokens (e.g., 50 for 10% overlap)
enc *tiktoken.Tiktoken
}
func NewSemanticChunker(targetSize, overlap int) (*SemanticChunker, error) {
enc, err := tiktoken.EncodingForModel("text-embedding-3-small")
if err != nil {
return nil, fmt.Errorf("init tokenizer: %w", err)
}
return &SemanticChunker{
targetSize: targetSize,
overlap: overlap,
enc: enc,
}, nil
}
// Chunk splits on paragraph boundaries, then merges into target-sized chunks.
func (c *SemanticChunker) Chunk(doc Document) []Chunk {
// Split by double newline (paragraph boundary)
paras := strings.Split(doc.Content, "\n\n")
var chunks []Chunk
var buffer strings.Builder
bufferTokens := 0
for _, para := range paras {
para = strings.TrimSpace(para)
if para == "" {
continue
}
paraTokens := len(c.enc.Encode(para, nil, nil))
// If adding this paragraph exceeds target size, flush buffer.
if bufferTokens+paraTokens > c.targetSize && buffer.Len() > 0 {
chunks = append(chunks, Chunk{
ID: uuid.New(),
DocumentID: doc.ID,
Content: buffer.String(),
Index: len(chunks),
Metadata: copyMetadata(doc.Metadata),
TokenCount: bufferTokens,
})
buffer.Reset()
bufferTokens = 0
}
// Add paragraph to buffer
if buffer.Len() > 0 {
buffer.WriteString("\n\n")
}
buffer.WriteString(para)
bufferTokens += paraTokens + 2 // +2 for separator
}
// Flush final buffer
if buffer.Len() > 0 {
chunks = append(chunks, Chunk{
ID: uuid.New(),
DocumentID: doc.ID,
Content: buffer.String(),
Index: len(chunks),
Metadata: copyMetadata(doc.Metadata),
TokenCount: bufferTokens,
})
}
return chunks
}
func copyMetadata(src map[string]string) map[string]string {
dst := make(map[string]string, len(src))
for k, v := range src {
dst[k] = v
}
return dst
}Decision framework: Use semantic chunking for documents with explicit structure (internal docs, blogs, knowledge bases, technical specs). Use fixed-size with overlap (50-100 tokens overlap) for homogeneous text (support tickets, logs, raw data). Always measure hit rate on your golden test set — the choice is data-driven, not heuristic. The semantic chunker adds complexity; measure first before optimizing.
In our experience, semantic chunking improves hit rate by 15-25% on knowledge bases because it respects document structure (chapters, sections, lists stay together). Fixed-size chunking is simpler to implement and works adequately on unstructured text, but fails hard on structured data where boundaries have semantic meaning. Start with fixed-size, measure hit rate, then switch to semantic if the metric is below your threshold (we target 75% hit rate as a baseline). The break-even point in our corpora is roughly 50k chunks — below that, the complexity of semantic chunking is not worth the 10-15% improvement; above that, it compounds.
Embedding Model Selection and Versioning
OpenAI's text-embedding-3-small is the current industry standard: cost-efficient (~$0.02 per 1M tokens) and proven retrieval quality on real corpora. The default output is 1536-dimensional vectors; the API also exposes a dimensions parameter that uses Matryoshka representation learning to truncate to smaller sizes (e.g. 512 or 768) at a small accuracy cost — useful when storage or index-build time dominates your cost[OpenAI Embeddings API]. Pick a dimension once, version it, and never silently change it: model behaviour updates can corrupt your vector store.
Embedding versioning is mandatory. Track the model version with each embedding in metadata. When OpenAI updates the model (changing dimensions, behavior, or cost), you must re-embed all chunks with the new version. Without versioning, old embeddings become incompatible with new query embeddings, destroying cosine similarity calculations. The embedding vector space is not a fixed contract — it evolves with model updates.
// embedder.go — embedding with versioning and caching
package rag
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/sashabaranov/go-openai"
)
const EmbeddingVersion = "text-embedding-3-small::20240415"
type EmbeddingService struct {
client *openai.Client
version string
cache map[string][]float32
cacheMu sync.RWMutex
hitRate float64
}
func NewEmbeddingService() *EmbeddingService {
return &EmbeddingService{
client: openai.NewClient(os.Getenv("OPENAI_API_KEY")),
version: EmbeddingVersion,
cache: make(map[string][]float32),
}
}
// Embed calls OpenAI with in-memory caching and version tracking.
// Returns embeddings aligned to the input order of texts, so callers can
// assign results by index. Cache hits and freshly fetched vectors are both
// written back to their original position — never appended in arrival order.
func (es *EmbeddingService) Embed(ctx context.Context, texts []string) ([][]float32, error) {
embeddings := make([][]float32, len(texts))
var toEmbed []string
var toEmbedIdx []int // original index in texts for each uncached entry
hitCount := 0
// Check cache, recording the input position of every miss.
es.cacheMu.RLock()
for i, text := range texts {
if emb, ok := es.cache[text]; ok {
embeddings[i] = emb
hitCount++
} else {
toEmbed = append(toEmbed, text)
toEmbedIdx = append(toEmbedIdx, i)
}
}
es.cacheMu.RUnlock()
// Call API for uncached texts (batch up to 2048 per OpenAI limits)
if len(toEmbed) > 0 {
resp, err := es.client.CreateEmbeddings(ctx, openai.EmbeddingRequest{
Input: toEmbed,
Model: openai.SmallEmbedding3,
})
if err != nil {
return nil, fmt.Errorf("embedding API: %w", err)
}
if len(resp.Data) != len(toEmbed) {
return nil, fmt.Errorf("embedding API returned %d vectors for %d inputs", len(resp.Data), len(toEmbed))
}
// Cache each vector and place it at its ORIGINAL input index.
// OpenAI does not guarantee resp.Data is returned in request order — each
// item carries an Index into the request Input — so key by data.Index.
es.cacheMu.Lock()
for _, data := range resp.Data {
embedding := make([]float32, len(data.Embedding))
for j, v := range data.Embedding {
embedding[j] = float32(v)
}
es.cache[toEmbed[data.Index]] = embedding
embeddings[toEmbedIdx[data.Index]] = embedding
}
es.cacheMu.Unlock()
}
// Track cache hit rate for monitoring
if len(texts) > 0 {
es.hitRate = float64(hitCount) / float64(len(texts))
}
return embeddings, nil
}
// Version returns the embedding model version (stored in chunk metadata).
func (es *EmbeddingService) Version() string {
return es.version
}In-memory cache in our deployments typically lands at 30-50% hit rate on ingestion batches and 60%+ on repeated queries, which directly cuts embedding API spend at ~$0.02 per 1M tokens[OpenAI Embeddings API]. The cache is application-level; for multi-instance deployments add Redis or memcached. As an order-of-magnitude example: at 1M chunks × 2 ingestions/month with 40% cache hit rate, you save tens of dollars/month — small in isolation but compounds across services.
Vector Database with PostgreSQL and pgvector
pgvector is production-grade[pgvector README]: low operational overhead (runs in the same PostgreSQL instance as your application), cost-effective, and supports hybrid (vector + keyword) search within SQL. HNSW indexing provides sub-millisecond retrieval latency for corpora up to ~10M vectors in our experience.
Schema design captures provenance and enables efficient querying:
-- chunks table with pgvector
CREATE TABLE chunks (
id uuid PRIMARY KEY,
document_id uuid NOT NULL,
content text NOT NULL,
chunk_index int NOT NULL,
embedding vector(1536), -- OpenAI text-embedding-3-small
embedding_version varchar(50) NOT NULL, -- e.g., "text-embedding-3-small::20240415"
token_count int,
metadata jsonb, -- {source, category, author, chunk_section}
created_at timestamp DEFAULT now(),
updated_at timestamp DEFAULT now()
);
-- HNSW index for vector search (m=16, ef_construction=200)
-- This trades insertion cost (~5% slower writes) for query speed (2-3x faster)
CREATE INDEX idx_chunks_embedding ON chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- Metadata index for filtering
CREATE INDEX idx_chunks_metadata ON chunks USING gin(metadata);
-- Partial index for current embedding version (ignore deprecated)
CREATE INDEX idx_chunks_current_version ON chunks (embedding_version)
WHERE embedding_version = 'text-embedding-3-small::20240415';
-- DocumentID index for deletion (idempotent re-ingestion)
CREATE INDEX idx_chunks_document_id ON chunks(document_id);Retrieval with fallback and graceful degradation:
// retriever.go
package rag
import (
"context"
"database/sql"
"fmt"
"log/slog"
pgvector "github.com/pgvector/pgvector-go"
)
type ChunkRepository struct {
db *sql.DB
logger *slog.Logger
}
// Search retrieves top-k chunks by cosine similarity, with optional metadata filtering.
// Falls back to keyword search if vector search returns no results (edge case).
func (r *ChunkRepository) Search(
ctx context.Context,
queryEmbedding []float32,
topK int,
minScore float64,
filters map[string]string,
) ([]SearchResult, error) {
vec := pgvector.NewVector(queryEmbedding)
// Vector similarity search (cosine <=> in pgvector)
query := `
SELECT id, document_id, content, metadata, token_count,
1 - (embedding <=> $1::vector) as similarity
FROM chunks
WHERE embedding_version = $2
AND 1 - (embedding <=> $1::vector) >= $3
`
args := []interface{}{vec, "text-embedding-3-small::20240415", minScore}
// Add metadata filters (exact match)
filterIdx := 4
for key, val := range filters {
query += fmt.Sprintf(" AND metadata->>'%s' = $%d", key, filterIdx)
args = append(args, val)
filterIdx++
}
query += fmt.Sprintf(" ORDER BY embedding <=> $1 LIMIT $%d", filterIdx)
args = append(args, topK)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
r.logger.Error("vector search failed", "error", err)
return nil, fmt.Errorf("query error: %w", err)
}
defer rows.Close()
var results []SearchResult
for rows.Next() {
var sr SearchResult
var tokCount int
if err := rows.Scan(&sr.Chunk.ID, &sr.Chunk.DocumentID, &sr.Chunk.Content,
&sr.Chunk.Metadata, &tokCount, &sr.Score); err != nil {
return nil, err
}
sr.Chunk.TokenCount = tokCount
results = append(results, sr)
}
return results, rows.Err()
}Production Patterns
[pgvector README]Pattern 1: Embedding Cache with LRU Eviction
type EmbeddingCache struct {
mu sync.RWMutex
cache map[string][]float32
lru []string
maxSize int
}
func (ec *EmbeddingCache) GetOrCompute(text string, fn func() ([]float32, error)) ([]float32, error) {
ec.mu.RLock()
if emb, ok := ec.cache[text]; ok {
ec.mu.RUnlock()
return emb, nil
}
ec.mu.RUnlock()
emb, err := fn()
if err != nil {
return nil, err
}
ec.mu.Lock()
ec.cache[text] = emb
ec.lru = append(ec.lru, text)
// Evict oldest 20% when cache exceeds maxSize
if len(ec.cache) > ec.maxSize {
evict := ec.maxSize / 5
for i := 0; i < evict && len(ec.lru) > 0; i++ {
old := ec.lru[0]
ec.lru = ec.lru[1:]
delete(ec.cache, old)
}
}
ec.mu.Unlock()
return emb, nil
}Cost impact: $0.02/1M tokens × 40-50% cache hit = $0.01/1M tokens effective. On 1M chunks × 2 ingestions/month = saves ~$20/month, scales to ~$200/month at 10M chunks. [Dean & Barroso, 2013]
Pattern 2: Graceful Degradation
When the vector DB is unavailable, fall back to pre-computed popular results:
func (r *ChunkRepository) SearchWithFallback(
ctx context.Context,
queryEmbedding []float32,
topK int,
) ([]SearchResult, error) {
results, err := r.Search(ctx, queryEmbedding, topK, 0.5, nil)
if err != nil && isConnectionError(err) {
// Vector DB is down; return cached top-100 popular chunks
r.logger.Warn("vector DB unavailable, using fallback cache")
return r.cachedPopularResults(), nil
}
if err != nil {
return nil, err
}
return results, nil
}
func isConnectionError(err error) bool {
s := err.Error()
return strings.Contains(s, "connection refused") ||
strings.Contains(s, "i/o timeout") ||
strings.Contains(s, "connection reset")
}Benefit: Search gracefully unavailable (HTTP 202 + empty result set) beats 500 Internal Server Error. Users understand "search is loading" better than "system failed."
Pattern 3: Reranking with Maximal Marginal Relevance
Balance relevance and diversity in the top-K set:
// mmr.go
import "math"
func ComputeMMR(results []SearchResult, lambda float64, topK int) []SearchResult {
selected := []SearchResult{}
remaining := results
for len(selected) < topK && len(remaining) > 0 {
bestIdx := 0
bestScore := lambda*remaining[0].Score - (1-lambda)*maxSimilarityToSelected(remaining[0], selected)
for i := 1; i < len(remaining); i++ {
score := lambda*remaining[i].Score - (1-lambda)*maxSimilarityToSelected(remaining[i], selected)
if score > bestScore {
bestScore = score
bestIdx = i
}
}
selected = append(selected, remaining[bestIdx])
remaining = append(remaining[:bestIdx], remaining[bestIdx+1:]...)
}
return selected
}
func cosineSimilarity(a, b []float32) float64 {
var dot float64
var normA, normB float64
for i := range a {
dot += float64(a[i]) * float64(b[i])
normA += float64(a[i]) * float64(a[i])
normB += float64(b[i]) * float64(b[i])
}
if normA == 0 || normB == 0 {
return 0
}
return dot / (math.Sqrt(normA) * math.Sqrt(normB))
}
func maxSimilarityToSelected(r SearchResult, selected []SearchResult) float64 {
maxSim := 0.0
for _, s := range selected {
sim := cosineSimilarity(r.Chunk.Embedding, s.Chunk.Embedding)
if sim > maxSim {
maxSim = sim
}
}
return maxSim
}When to use: Set lambda = 0.6 (60% relevance, 40% diversity) to reduce duplicate content in results. Typical use case: multiple chunks from the same section or adjacent paragraphs. Without MMR, all top-10 results might come from one article. [OWASP LLM Top 10]
Evaluation Pipeline
[OWASP LLM Top 10]Build a golden test set of 100-150 curated (query, expected_chunk_ids) pairs. Evaluate on every pipeline change (chunking strategy, embedding model, index parameters). Without measurement, regressions are invisible until users complain.
// eval_test.go
package rag
import (
"context"
"testing"
"time"
)
type EvalCase struct {
Query string
ExpectedChunkIDs []string
Category string
}
func TestRetrievalQuality(t *testing.T) {
goldenSet := loadGoldenTestSet("testdata/golden_evals.json")
var hitRates, precisions, recalls []float64
var latencies []time.Duration
for _, tc := range goldenSet {
emb, err := embedder.Embed(context.Background(), []string{tc.Query})
if err != nil {
t.Fatalf("embed: %v", err)
}
start := time.Now()
results, err := repo.Search(context.Background(), emb[0], 10, 0.5, nil)
latencies = append(latencies, time.Since(start))
if err != nil {
t.Fatalf("search: %v", err)
}
// Compute metrics
retrieved := extractIDs(results)
hit := contains(retrieved, tc.ExpectedChunkIDs[0])
hitRates = append(hitRates, boolToFloat(hit))
prec := precision(retrieved, tc.ExpectedChunkIDs)
recall := recall(retrieved, tc.ExpectedChunkIDs)
precisions = append(precisions, prec)
recalls = append(recalls, recall)
}
// Assert thresholds (fail if regression detected)
avgHit := avg(hitRates)
if avgHit < 0.75 {
t.Errorf("hit rate %.3f below threshold 0.75", avgHit)
}
avgPrec := avg(precisions)
if avgPrec < 0.65 {
t.Errorf("precision %.3f below threshold 0.65", avgPrec)
}
p99Latency := percentile(latencies, 99)
if p99Latency > 500*time.Millisecond {
t.Errorf("p99 latency %v exceeds 500ms threshold", p99Latency)
}
t.Logf("Hit Rate: %.3f | Precision: %.3f | Recall: %.3f | p99 Latency: %v",
avgHit, avgPrec, avg(recalls), p99Latency)
}Ingestion Pipeline
Ingestion is where document insertion meets embedding parallelization. Idempotent re-ingestion (replacing a document deletes its old chunks and inserts new ones atomically) is required for document updates. Batching embedding calls is critical for cost control.
// ingest.go
package rag
import (
"context"
"fmt"
"log/slog"
"time"
)
type Ingester struct {
chunker *SemanticChunker
embedder *EmbeddingService
repo *ChunkRepository
logger *slog.Logger
}
func NewIngester(
chunker *SemanticChunker,
embedder *EmbeddingService,
repo *ChunkRepository,
logger *slog.Logger,
) *Ingester {
return &Ingester{
chunker: chunker,
embedder: embedder,
repo: repo,
logger: logger,
}
}
// Ingest processes a document: chunk → embed → store (idempotent).
// Replaces all existing chunks for the document.
func (ing *Ingester) Ingest(ctx context.Context, doc Document) error {
start := time.Now()
// Step 1: Delete existing chunks (idempotent update)
if err := ing.repo.DeleteByDocument(ctx, doc.ID); err != nil {
return fmt.Errorf("delete existing chunks: %w", err)
}
// Step 2: Chunk the document
chunks := ing.chunker.Chunk(doc)
if len(chunks) == 0 {
ing.logger.Warn("document produced no chunks",
"document_id", doc.ID,
"title", doc.Title)
return nil
}
// Step 3: Embed chunks in batches (OpenAI limit: 2048 per request)
const embedBatchSize = 256
for i := 0; i < len(chunks); i += embedBatchSize {
end := i + embedBatchSize
if end > len(chunks) {
end = len(chunks)
}
texts := make([]string, end-i)
for j := i; j < end; j++ {
texts[j-i] = chunks[j].Content
}
embeddings, err := ing.embedder.Embed(ctx, texts)
if err != nil {
return fmt.Errorf("embed batch %d-%d: %w", i, end, err)
}
for j, emb := range embeddings {
chunks[i+j].Embedding = emb
chunks[i+j].EmbeddingVersion = ing.embedder.Version()
}
}
// Step 4: Store chunks
if err := ing.repo.BatchUpsert(ctx, chunks); err != nil {
return fmt.Errorf("store chunks: %w", err)
}
ing.logger.Info("document ingested",
"document_id", doc.ID,
"title", doc.Title,
"chunks", len(chunks),
"duration", time.Since(start),
)
return nil
}
// IngestBatch processes multiple documents concurrently with bounded parallelism.
func (ing *Ingester) IngestBatch(
ctx context.Context, docs []Document, concurrency int,
) error {
sem := make(chan struct{}, concurrency)
errCh := make(chan error, len(docs))
for _, doc := range docs {
sem <- struct{}{}
go func(d Document) {
defer func() { <-sem }()
if err := ing.Ingest(ctx, d); err != nil {
errCh <- fmt.Errorf("ingest %s: %w", d.ID, err)
}
}(doc)
}
// Wait for all goroutines to finish
for i := 0; i < cap(sem); i++ {
sem <- struct{}{}
}
close(errCh)
var errs []error
for err := range errCh {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("%d documents failed ingestion: %w", len(errs), errs[0])
}
return nil
}Batching strategy: Embed 256 chunks per API call (OpenAI supports 2048). This reduces API costs by ~8x compared to single-chunk calls. On 100k chunks, 256-per-batch = 390 API calls vs 100,000 calls. [OpenAI Embeddings API]
Deployment Checklist
Before production, verify:
| Check | How | Why |
|---|---|---|
| Embedding versioning | Every chunk: embedding_version = "text-embedding-3-small::20240415" | Prevents silent vector store corruption on API updates |
| Re-embed capability | Can re-embed all chunks within 24 hours | Model upgrades require full re-indexing |
| Golden test set | Minimum 100 curated (query, expected_chunks) pairs | Unlocks quality metrics |
| Eval in CI | Retrieval tests run on every PR; block if hit rate drops >5% | Catches regressions from code changes |
| Graceful degradation | Vector DB down → cached top-10 results, not 500 error | Better UX; users understand "search loading" |
| Index tuning | HNSW m=16, ef_construction=200 for 1M chunks | Latency vs. recall tradeoff; adjust for your scale |
| Rate limits | Embedding calls batched (256 per request); quota monitored | Prevents one noisy tenant from exhausting API quota |
| Cost tracking | Tokens logged per query and ingestion batch | Embedding + generation costs compound |
The most common mistake in RAG systems is over-engineering the initial implementation. Start with fixed-size chunking
at 512 tokens, OpenAI text-embedding-3-small, pgvector with default HNSW parameters, and top-10 retrieval. Build the
evaluation pipeline on day one. Measure hit rate, precision, and recall. Only then optimize: switch to semantic
chunking if chunk boundaries are the bottleneck, increase HNSW m if recall is low, add hybrid search if keyword
queries underperform. Every optimization should be justified by a metric improvement on your golden test set.
The difference between a RAG demo and a RAG system is the evaluation pipeline, operational instrumentation, and graceful degradation paths. Ship simple, measure rigorously, iterate on the metrics.
Frequently Asked Questions
Why do RAG prototypes fail in production?
Prototypes use naive fixed-size chunking that splits mid-paragraph destroying context, lack embedding versioning so API updates silently corrupt the vector store, have no evaluation pipeline to detect retrieval degradation, and use default index parameters that cannot sustain concurrent query load.
What chunking strategy works best for RAG?
Use semantic chunking that splits on paragraph and section boundaries rather than fixed token counts. Preserve document structure (headings, lists) within chunks, add overlap between chunks (10-20%) to maintain context at boundaries, and include metadata (source, section title) with each chunk for filtering. [pgvector README]
How do you evaluate RAG retrieval quality?
Build an evaluation pipeline with a curated set of question-answer pairs. Measure retrieval precision (are the retrieved chunks relevant?) and recall (are relevant chunks being found?). Run evaluations on every pipeline change — chunking strategy, embedding model, or index parameter update — to catch regressions before users do.
Why use Go instead of Python for RAG pipelines?
The RAG pipeline — ingestion, chunking, embedding API calls, vector DB operations, HTTP serving — is I/O-bound infrastructure code. Go's concurrency model, static typing, and single-binary deployment optimize for reliability and operational simplicity. The LLM and embedding models run remotely via API, so your language choice for pipeline orchestration does not need ML library access.
Keep Reading
- Vector Databases Compared: pgvector vs Pinecone vs Weaviate — Benchmarks, scaling limits, and migration thresholds for choosing the right vector store
- LLM API Integration Patterns — Streaming, retry logic, token budgets, and cost tracking for the LLM calls your RAG pipeline feeds into
- Database Indexing Strategies — B-tree, GIN, and GiST index internals that apply to the metadata and full-text indexes alongside your HNSW vectors
Engineering Team
A multidisciplinary team of backend engineers, architects, and DevOps practitioners shipping deep dives into distributed systems and production infrastructure.
Read Next
Vector Databases Compared: pgvector vs Pinecone vs Weaviate
Compare pgvector, Pinecone, Weaviate, Qdrant, Milvus, and Chroma on performance, cost, and operational fit with real code and benchmarks.
Building an MCP Server in Go with Code Mode: From 1.17M Tokens to 1,000
2,500 API endpoints in one MCP server without blowing context windows. The Code Mode pattern uses search + execute to cut token cost by 1,000x.
LLM API Integration Patterns for Backend Engineers
Production LLM API patterns: streaming, function calling, retries, token budgets, cost optimization, and observability for backend engineers.