Haystack Tutorial (Python): adding observability for advanced developers

By Cyprian AaronsUpdated 2026-04-21
haystackadding-observability-for-advanced-developerspython

This tutorial shows how to add observability to a Haystack pipeline so you can inspect component inputs, outputs, timings, and failures without guessing. You need this when a pipeline works in dev but becomes opaque in staging or production, especially once retrieval, routing, and generation start interacting.

What You'll Need

  • Python 3.10+
  • haystack-ai
  • openai if you want to use an OpenAI generator
  • A valid OPENAI_API_KEY
  • Basic familiarity with Haystack Pipeline, Document, and components
  • A terminal and a place to run Python scripts

Install the packages:

pip install haystack-ai openai

Step-by-Step

  1. Start with a small pipeline that has real moving parts.
    Observability is only useful if there is something to observe, so we will build a retriever-plus-generator flow instead of a toy single-node script.
import os
from haystack import Document, Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")

document_store = InMemoryDocumentStore()
documents = [
    Document(content="Haystack is an open-source framework for building LLM applications."),
    Document(content="Observability helps you inspect latency, inputs, outputs, and failures."),
    Document(content="BM25 is a lexical retrieval algorithm often used for search."),
]
document_store.write_documents(documents)

template = """
Answer the question using only the documents below.

Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}

Question: {{ question }}
Answer:
"""

pipe = Pipeline()
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini"))

pipe.connect("retriever.documents", "prompt_builder.documents")
pipe.connect("prompt_builder.prompt", "llm.messages")
  1. Wrap each component with timing so you can see where the time goes.
    Haystack gives you structured pipeline execution, but if you want operational visibility in your own logs or metrics system, measuring each stage explicitly is still useful.
import time

def timed_call(label, fn):
    start = time.perf_counter()
    result = fn()
    elapsed_ms = (time.perf_counter() - start) * 1000
    print(f"[timing] {label}: {elapsed_ms:.2f} ms")
    return result

question = "What does observability help you inspect?"
retrieved = timed_call(
    "retriever",
    lambda: pipe.get_component("retriever").run(query=question),
)
print(retrieved)
  1. Run the full pipeline and print structured outputs.
    The important part here is not just the answer; it is seeing the retrieved documents and the generated response together so you can debug bad retrieval before blaming the model.
question = "What does observability help you inspect?"

result = pipe.run(
    {
        "retriever": {"query": question},
        "prompt_builder": {"question": question},
    }
)

print("=== Retrieved Documents ===")
for i, doc in enumerate(result["retriever"]["documents"], start=1):
    print(f"{i}. {doc.content}")

print("\n=== Model Response ===")
print(result["llm"]["replies"][0].text)
  1. Add a lightweight observability layer around the pipeline run.
    This gives you request-level tracing with input size, output size, and latency without depending on external infrastructure yet.
import json
import uuid

def traced_run(question: str):
    trace_id = str(uuid.uuid4())
    started_at = time.perf_counter()

    print(json.dumps({
        "event": "pipeline_start",
        "trace_id": trace_id,
        "question": question,
        "question_chars": len(question),
    }))

    result = pipe.run(
        {
            "retriever": {"query": question},
            "prompt_builder": {"question": question},
        }
    )

    duration_ms = (time.perf_counter() - started_at) * 1000
    answer = result["llm"]["replies"][0].text

    print(json.dumps({
        "event": "pipeline_end",
        "trace_id": trace_id,
        "duration_ms": round(duration_ms, 2),
        "answer_chars": len(answer),
        "documents_returned": len(result["retriever"]["documents"]),
    }))

    return result

traced_run("What does observability help you inspect?")
  1. Capture failures with enough context to debug them later.
    In production, empty retriever results or missing API keys are common failure modes, so log the input and stage name before re-raising.
def safe_run(question: str):
    try:
        return traced_run(question)
    except Exception as exc:
        print(json.dumps({
            "event": "pipeline_error",
            "stage": "pipeline",
            "question": question,
            "error_type": type(exc).__name__,
            "error_message": str(exc),
        }))
        raise

safe_run("Explain Haystack observability in one sentence.")

Testing It

Run the script with a valid OPENAI_API_KEY set in your environment. You should see JSON logs for pipeline_start, pipeline_end, plus timing output for at least one stage.

If retrieval works correctly, the printed documents should include the sentence about observability helping inspect latency, inputs, outputs, and failures. If the model call fails, your error log should tell you whether it was an auth issue, network issue, or something else upstream.

A good sanity check is to change the question and confirm both retrieval results and latency numbers change accordingly. That tells you your observability signals are actually tied to live execution instead of static prints.

Next Steps

  • Export these logs to OpenTelemetry or your existing metrics stack.
  • Add per-component token counts and prompt sizes before calling the generator.
  • Replace InMemoryDocumentStore with your production store and keep the same tracing pattern around it.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides