Haystack Tutorial (Python): building a RAG pipeline for advanced developers
This tutorial builds a production-style Retrieval-Augmented Generation pipeline in Haystack using Python, with document ingestion, embedding-based retrieval, and grounded answer generation. You’d use this when a plain chat model is not enough and you need answers constrained to your own internal docs, policies, or knowledge base.
What You'll Need
- •Python 3.10+
- •
haystack-ai - •
openaiAPI key - •A small local corpus of text files or documents
- •Optional:
python-dotenvfor loading secrets from.env
Install the packages:
pip install haystack-ai openai python-dotenv
Set your API key:
export OPENAI_API_KEY="your-key-here"
Step-by-Step
- •Start by creating a small document set and indexing it into an in-memory document store. For advanced use cases, keep ingestion separate from query-time code so you can swap the store later without rewriting the pipeline.
from haystack import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
documents = [
Document(content="Haystack is an open-source framework for building LLM applications."),
Document(content="RAG combines retrieval with generation to ground answers in source data."),
Document(content="For production systems, chunking and metadata matter as much as embeddings."),
]
document_store = InMemoryDocumentStore(embedding_similarity_function="cosine")
document_store.write_documents(documents)
print(f"Stored {document_store.count_documents()} documents")
- •Next, embed the documents and write vectors back into the store. This step is what makes semantic retrieval work; without embeddings, your retriever is just doing keyword matching.
from haystack.components.embedders import OpenAITextEmbedder, OpenAIDocumentEmbedder
from haystack.dataclasses import Document
doc_embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
embedded_docs = doc_embedder.run(documents=documents)["documents"]
document_store.write_documents(embedded_docs)
print("Documents embedded and stored")
- •Build a retriever and a prompt builder. The retriever pulls the most relevant chunks, and the prompt builder turns those chunks into a grounded prompt that the generator can answer from.
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
template = """
Answer the question using only the provided documents.
Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(template=template)
- •Add the generator and connect everything into a pipeline. This is the part that matters operationally: retrieval, prompt construction, and generation should be explicit nodes so you can inspect each stage during debugging.
from haystack import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
generator = OpenAIChatGenerator(model="gpt-4o-mini")
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("generator", generator)
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "generator.messages")
- •Run a query against the pipeline. Use a question that should be answerable from your indexed content; in production, you’d also log retrieved documents and token usage for traceability.
question = "What makes RAG useful in production systems?"
result = rag_pipeline.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
"generator": {
"messages": [ChatMessage.from_user(question)]
},
}
)
answer = result["generator"]["replies"][0].content
print(answer)
- •If you want stronger control, inspect retrieved documents before generation. This is how you debug bad answers: check whether retrieval failed before blaming the model.
retrieval_result = retriever.run(query=question)
docs = retrieval_result["documents"]
for i, doc in enumerate(docs, start=1):
print(f"{i}. {doc.content}")
Testing It
Run the script end to end and confirm that the answer references only your supplied content. If it starts inventing details not present in the documents, tighten your prompt or improve retrieval quality by adding better chunks and metadata.
Then try a question that should fail, like asking for information not in the corpus. A good RAG pipeline should either say it does not know or produce a clearly constrained answer instead of hallucinating.
For deeper validation, compare retrieved documents against expected relevance manually for a few test queries. That gives you a baseline before you move to larger corpora or add reranking.
Next Steps
- •Add chunking with overlap before embedding so long documents retrieve better.
- •Swap
InMemoryDocumentStorefor Elasticsearch or Qdrant when you need persistence. - •Add a reranker between retrieval and prompting for better precision on ambiguous queries
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit