Haystack Tutorial (Python): adding observability for intermediate developers

By Cyprian AaronsUpdated 2026-04-21
haystackadding-observability-for-intermediate-developerspython

This tutorial shows how to add practical observability to a Haystack pipeline in Python using structured logging, timing, and trace-friendly metadata. You need this when your RAG pipeline starts failing in ways that are hard to debug: slow retrieval, bad answers, missing documents, or inconsistent behavior across environments.

What You'll Need

  • Python 3.10+
  • haystack-ai
  • openai if you want to use an LLM generator
  • An OpenAI API key in OPENAI_API_KEY
  • A terminal and a working virtual environment
  • Basic familiarity with Haystack pipelines, retrievers, and generators

Install the packages:

pip install haystack-ai openai

Step-by-Step

  1. Start with a small RAG pipeline that we can instrument. The point is not the model quality here; it is making every stage visible so you can inspect what happened when the answer looks wrong.
import os
import time
from typing import Any

from haystack import Document, Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators.openai import OpenAIChatGenerator
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

docs = [
    Document(content="Haystack is a Python framework for building LLM applications."),
    Document(content="Observability helps debug retrieval quality, latency, and prompt issues."),
    Document(content="BM25 is a lexical retrieval method that works well on keyword queries."),
]

document_store.write_documents(docs)
  1. Build the pipeline with explicit stages. This gives you clear boundaries for timing and logging each component rather than treating the whole run as one opaque call.
pipeline = Pipeline()

pipeline.add_component("cleaner", DocumentCleaner())
pipeline.add_component("splitter", DocumentSplitter(split_length=1))
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component(
    "prompt_builder",
    PromptBuilder(
        template="""
Use the following documents to answer the question.

Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}

Question: {{ question }}
Answer:
"""
    ),
)
pipeline.add_component(
    "llm",
    OpenAIChatGenerator(model="gpt-4o-mini")
)

pipeline.connect("cleaner.documents", "splitter.documents")
pipeline.connect("splitter.documents", "retriever.query")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")
  1. Add a tiny observability wrapper around each stage. This logs input shape, output shape, and runtime in milliseconds, which is enough to spot most production issues without introducing heavy tracing infrastructure on day one.
def timed_call(name: str, fn: Any, *args: Any, **kwargs: Any) -> Any:
    start = time.perf_counter()
    result = fn(*args, **kwargs)
    elapsed_ms = (time.perf_counter() - start) * 1000

    print(f"[{name}] took {elapsed_ms:.2f} ms")

    if isinstance(result, dict):
        print(f"[{name}] keys: {list(result.keys())}")

    return result


question = "What does observability help debug?"
  1. Run the stages manually first so you can inspect intermediate outputs before hiding them behind Pipeline.run(). This is the simplest way to catch bad retrieval or malformed prompts because you can see each artifact directly.
cleaned = timed_call("cleaner", pipeline.get_component("cleaner").run, documents=docs)
split_docs = timed_call("splitter", pipeline.get_component("splitter").run, documents=cleaned["documents"])

retrieval_result = timed_call(
    "retriever",
    pipeline.get_component("retriever").run,
    query=question,
)

prompt_result = timed_call(
    "prompt_builder",
    pipeline.get_component("prompt_builder").run,
    documents=retrieval_result["documents"],
    question=question,
)

print("\nPROMPT PREVIEW:\n")
print(prompt_result["prompt"])
  1. Now run the full pipeline with the same question and capture its final output. In production you would send these timings and metadata to your logging stack or tracing backend; here we print them so you can see exactly what should be recorded.
result = timed_call(
    "pipeline",
    pipeline.run,
    {
        "cleaner": {"documents": docs},
        "retriever": {"query": question},
        "prompt_builder": {"question": question},
        "llm": {"messages": []},
    },
)

print("\nFINAL RESULT:\n")
print(result)
  1. If you want better observability later, add structured fields instead of plain prints. The pattern below is what you would feed into JSON logs, OpenTelemetry spans, or Datadog events.
def log_event(event_name: str, **fields: Any) -> None:
    payload = {"event": event_name, **fields}
    print(payload)

log_event(
    "rag_run_complete",
    question=question,
    retrieved_docs=len(retrieval_result["documents"]),
    prompt_chars=len(prompt_result["prompt"]),
)

Testing It

Run the script and confirm that each stage prints its own timing line. You should also see the retriever returning documents and the prompt builder producing a readable prompt with those documents inserted.

If the retrieval count is zero or the prompt looks empty, your issue is upstream of generation. That is exactly why observability matters here: it tells you whether the problem is cleaning, splitting, retrieval, or prompting instead of forcing you to guess.

If you have an OpenAI key configured correctly, the final pipeline run should return a generated answer object from OpenAIChatGenerator. If it fails at that step but earlier stages work, your logs will already show that the failure is isolated to generation rather than document handling.

Next Steps

  • Add OpenTelemetry spans around each component instead of printing timings.
  • Send structured JSON logs to your centralized logging system.
  • Track retrieval metrics like top-k score distribution and empty-result rate.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides