Haystack Tutorial (Python): caching embeddings for advanced developers

By Cyprian AaronsUpdated 2026-04-21
haystackcaching-embeddings-for-advanced-developerspython

This tutorial shows how to cache embeddings in a Haystack pipeline so repeated document ingestion does not recompute vectors for unchanged content. You need this when you re-index the same corpus often, run batch jobs on expensive embedding models, or want predictable latency and cost.

What You'll Need

  • Python 3.10+
  • haystack-ai
  • sentence-transformers
  • A working internet connection for the first model download
  • Optional: numpy if you want to inspect vector shapes
  • A local file or dataset with text documents to index

Install the packages:

pip install haystack-ai sentence-transformers

Step-by-Step

  1. Start by creating a small document set and a deterministic cache key for each document. The key should change when the text changes, otherwise you will reuse stale embeddings.
import hashlib
from haystack import Document

def embedding_cache_key(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()

docs = [
    Document(content="Haystack is a framework for building LLM applications."),
    Document(content="Embedding caches save time when documents are reprocessed."),
]

for doc in docs:
    doc.meta["cache_key"] = embedding_cache_key(doc.content)
    print(doc.meta["cache_key"], doc.content)
  1. Next, load an embedder and compute embeddings only for documents that are not already cached. For this tutorial, the cache is an in-memory dictionary keyed by your content hash, which is enough to prove the pattern before moving it to Redis or Postgres.
from haystack.components.embedders import SentenceTransformersDocumentEmbedder

embedder = SentenceTransformersDocumentEmbedder(
    model="sentence-transformers/all-MiniLM-L6-v2"
)
embedder.warm_up()

embedding_cache = {}

def embed_with_cache(document):
    cache_key = document.meta["cache_key"]
    if cache_key in embedding_cache:
        document.embedding = embedding_cache[cache_key]
        return document, True

    result = embedder.run([document])
    embedded_doc = result["documents"][0]
    embedding_cache[cache_key] = embedded_doc.embedding
    return embedded_doc, False
  1. Run the first pass and store the embeddings. On a second pass with the same text, the code should hit the cache and skip model inference.
first_pass = []
for doc in docs:
    embedded_doc, cached = embed_with_cache(doc)
    first_pass.append((embedded_doc, cached))
    print(f"cached={cached} dim={len(embedded_doc.embedding)}")

second_pass = []
for doc in docs:
    embedded_doc, cached = embed_with_cache(doc)
    second_pass.append((embedded_doc, cached))
    print(f"cached={cached} dim={len(embedded_doc.embedding)}")
  1. If you want this to work in a real pipeline, put the cache lookup before indexing and retrieval. The important part is that your pipeline sees documents with embeddings already attached, so downstream components do not pay the embedding cost again.
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()
writer = DocumentWriter(document_store=document_store)

embedded_docs = []
for doc in docs:
    embedded_doc, _ = embed_with_cache(doc)
    embedded_docs.append(embedded_doc)

writer.run(embedded_docs)

stored_docs = document_store.filter_documents()
print(len(stored_docs))
print(stored_docs[0].embedding[:5])
  1. To make caching production-grade, persist the cache outside process memory. Use Redis, SQLite, or Postgres so your next job run can reuse embeddings after restarts.
import json
import sqlite3

conn = sqlite3.connect("embeddings_cache.sqlite")
conn.execute(
    "CREATE TABLE IF NOT EXISTS embeddings (cache_key TEXT PRIMARY KEY, vector TEXT NOT NULL)"
)

def save_embedding(cache_key: str, vector):
    conn.execute(
        "INSERT OR REPLACE INTO embeddings (cache_key, vector) VALUES (?, ?)",
        (cache_key, json.dumps(vector)),
    )
    conn.commit()

def load_embedding(cache_key: str):
    row = conn.execute(
        "SELECT vector FROM embeddings WHERE cache_key=?",
        (cache_key,),
    ).fetchone()
    return json.loads(row[0]) if row else None

Testing It

Run the script twice and watch the output from cached=False on the first pass to cached=True on the second pass. That tells you your keying strategy is stable and your cache is being hit.

If you change one document’s text and rerun it, only that document should miss the cache and trigger a fresh embedding call. That is the behavior you want in ingestion pipelines where most content stays unchanged.

Also check that every stored document has an embedding attached before writing to your store. If embeddings are missing, retrieval quality will fail later and debugging becomes annoying fast.

Next Steps

  • Move from in-memory caching to Redis with TTLs for distributed workers
  • Add versioning to your cache key so model upgrades invalidate old vectors cleanly
  • Combine this with Haystack retrievers and evaluators to measure cost savings versus recall

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides