Haystack Tutorial (Python): adding observability for advanced developers
This tutorial shows how to add observability to a Haystack pipeline so you can inspect component inputs, outputs, timings, and failures without guessing. You need this when a pipeline works in dev but becomes opaque in staging or production, especially once retrieval, routing, and generation start interacting.
What You'll Need
- •Python 3.10+
- •
haystack-ai - •
openaiif you want to use an OpenAI generator - •A valid
OPENAI_API_KEY - •Basic familiarity with Haystack
Pipeline,Document, and components - •A terminal and a place to run Python scripts
Install the packages:
pip install haystack-ai openai
Step-by-Step
- •Start with a small pipeline that has real moving parts.
Observability is only useful if there is something to observe, so we will build a retriever-plus-generator flow instead of a toy single-node script.
import os
from haystack import Document, Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")
document_store = InMemoryDocumentStore()
documents = [
Document(content="Haystack is an open-source framework for building LLM applications."),
Document(content="Observability helps you inspect latency, inputs, outputs, and failures."),
Document(content="BM25 is a lexical retrieval algorithm often used for search."),
]
document_store.write_documents(documents)
template = """
Answer the question using only the documents below.
Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""
pipe = Pipeline()
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.connect("retriever.documents", "prompt_builder.documents")
pipe.connect("prompt_builder.prompt", "llm.messages")
- •Wrap each component with timing so you can see where the time goes.
Haystack gives you structured pipeline execution, but if you want operational visibility in your own logs or metrics system, measuring each stage explicitly is still useful.
import time
def timed_call(label, fn):
start = time.perf_counter()
result = fn()
elapsed_ms = (time.perf_counter() - start) * 1000
print(f"[timing] {label}: {elapsed_ms:.2f} ms")
return result
question = "What does observability help you inspect?"
retrieved = timed_call(
"retriever",
lambda: pipe.get_component("retriever").run(query=question),
)
print(retrieved)
- •Run the full pipeline and print structured outputs.
The important part here is not just the answer; it is seeing the retrieved documents and the generated response together so you can debug bad retrieval before blaming the model.
question = "What does observability help you inspect?"
result = pipe.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)
print("=== Retrieved Documents ===")
for i, doc in enumerate(result["retriever"]["documents"], start=1):
print(f"{i}. {doc.content}")
print("\n=== Model Response ===")
print(result["llm"]["replies"][0].text)
- •Add a lightweight observability layer around the pipeline run.
This gives you request-level tracing with input size, output size, and latency without depending on external infrastructure yet.
import json
import uuid
def traced_run(question: str):
trace_id = str(uuid.uuid4())
started_at = time.perf_counter()
print(json.dumps({
"event": "pipeline_start",
"trace_id": trace_id,
"question": question,
"question_chars": len(question),
}))
result = pipe.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)
duration_ms = (time.perf_counter() - started_at) * 1000
answer = result["llm"]["replies"][0].text
print(json.dumps({
"event": "pipeline_end",
"trace_id": trace_id,
"duration_ms": round(duration_ms, 2),
"answer_chars": len(answer),
"documents_returned": len(result["retriever"]["documents"]),
}))
return result
traced_run("What does observability help you inspect?")
- •Capture failures with enough context to debug them later.
In production, empty retriever results or missing API keys are common failure modes, so log the input and stage name before re-raising.
def safe_run(question: str):
try:
return traced_run(question)
except Exception as exc:
print(json.dumps({
"event": "pipeline_error",
"stage": "pipeline",
"question": question,
"error_type": type(exc).__name__,
"error_message": str(exc),
}))
raise
safe_run("Explain Haystack observability in one sentence.")
Testing It
Run the script with a valid OPENAI_API_KEY set in your environment. You should see JSON logs for pipeline_start, pipeline_end, plus timing output for at least one stage.
If retrieval works correctly, the printed documents should include the sentence about observability helping inspect latency, inputs, outputs, and failures. If the model call fails, your error log should tell you whether it was an auth issue, network issue, or something else upstream.
A good sanity check is to change the question and confirm both retrieval results and latency numbers change accordingly. That tells you your observability signals are actually tied to live execution instead of static prints.
Next Steps
- •Export these logs to OpenTelemetry or your existing metrics stack.
- •Add per-component token counts and prompt sizes before calling the generator.
- •Replace
InMemoryDocumentStorewith your production store and keep the same tracing pattern around it.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit