Haystack Tutorial (Python): adding observability for beginners

By Cyprian AaronsUpdated 2026-04-21
haystackadding-observability-for-beginnerspython

This tutorial shows you how to add basic observability to a Haystack pipeline in Python so you can see what each component is doing, how long it takes, and where failures happen. You need this when your RAG or agent workflow works locally but becomes hard to debug once you start chaining retrievers, generators, and external APIs.

What You'll Need

  • Python 3.10+
  • haystack-ai
  • haystack-experimental
  • An OpenAI API key if you want to use a real generator
  • A working Haystack pipeline with at least one retriever and one generator
  • Basic familiarity with Haystack Pipeline, DocumentStore, and components

Install the packages first:

pip install haystack-ai haystack-experimental openai

Set your API key:

export OPENAI_API_KEY="your-key-here"

Step-by-Step

  1. Start with a small Haystack pipeline that we can observe.
    The point is not to build a fancy app yet, but to create a realistic flow with retrieval and generation.
import os
from haystack import Document, Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()
document_store.write_documents([
    Document(content="Haystack is an open-source framework for building LLM applications."),
    Document(content="Observability helps you debug latency, errors, and retrieval quality."),
])

retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(
    template="Answer the question using these documents:\n{% for doc in documents %}- {{ doc.content }}\n{% endfor %}\nQuestion: {{ question }}\nAnswer:"
)

generator = OpenAIChatGenerator(model="gpt-4o-mini")

pipeline = Pipeline()
pipeline.add_component("retriever", retriever)
pipeline.add_component("prompt_builder", prompt_builder)
pipeline.add_component("generator", generator)

pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "generator.messages")
  1. Add observability by wrapping the pipeline run with structured logging and timing.
    For beginners, this is the fastest way to get useful traces without changing Haystack internals.
import json
import time

def run_with_observability(question: str):
    started_at = time.perf_counter()

    print(json.dumps({
        "event": "pipeline_started",
        "question": question,
    }))

    result = pipeline.run({
        "retriever": {"query": question},
        "prompt_builder": {"question": question},
        "generator": {"messages": []},
    })

    elapsed_ms = round((time.perf_counter() - started_at) * 1000, 2)

    print(json.dumps({
        "event": "pipeline_finished",
        "elapsed_ms": elapsed_ms,
        "retrieved_docs": len(result["retriever"]["documents"]),
    }))

    return result

if __name__ == "__main__":
    answer = run_with_observability("What does observability help with?")
    print(answer["generator"]["replies"][0].text)
  1. Capture per-component inputs and outputs so you can debug failures faster.
    This is where observability becomes useful: if retrieval looks fine but generation fails, you want evidence instead of guesswork.
def log_documents(docs):
    return [
        {"content": doc.content[:80], "score": getattr(doc, "score", None)}
        for doc in docs
    ]

def run_with_component_logs(question: str):
    start = time.perf_counter()

    retrieval_result = retriever.run(query=question)
    print(json.dumps({
        "component": "retriever",
        "documents": log_documents(retrieval_result["documents"]),
    }))

    prompt_result = prompt_builder.run(
        documents=retrieval_result["documents"],
        question=question,
    )
    print(json.dumps({
        "component": "prompt_builder",
        "prompt_preview": prompt_result["prompt"][:200],
    }))

    gen_result = generator.run(messages=[])
    total_ms = round((time.perf_counter() - start) * 1000, 2)

    print(json.dumps({
        "component": "generator",
        "elapsed_ms": total_ms,
        "replies_count": len(gen_result["replies"]),
    }))

run_with_component_logs("Why do teams add observability?")
  1. Add error reporting around each step so failures are visible in logs.
    In production, this matters more than pretty output because silent failures are expensive.
def safe_run(question: str):
    try:
        retrieval_result = retriever.run(query=question)
        prompt_result = prompt_builder.run(
            documents=retrieval_result["documents"],
            question=question,
        )

        return generator.run(messages=[])

    except Exception as exc:
        print(json.dumps({
            "event": "pipeline_error",
            "error_type": type(exc).__name__,
            "error_message": str(exc),
            "question": question,
        }))
        raise

safe_run("Show me an example of observability.")
  1. If you want real trace data later, export the same events into your logging stack or tracing backend.
    The pattern stays the same: emit timestamps, component names, inputs, outputs, and exceptions in a machine-readable format.
import logging

logger = logging.getLogger("haystack_observability")
logging.basicConfig(level=logging.INFO)

def instrumented_retrieve(query: str):
    t0 = time.perf_counter()
    result = retriever.run(query=query)
    logger.info(
        json.dumps({
            "component": "retriever",
            "query": query,
            "docs_found": len(result["documents"]),
            "elapsed_ms": round((time.perf_counter() - t0) * 1000, 2),
        })
    )
    return result

instrumented_retrieve("What is Haystack?")

Testing It

Run the script from top to bottom and watch the console output. You should see JSON logs for pipeline start, component-level activity, and pipeline completion.

If you have a valid OpenAI key, the generator should return a real answer instead of failing on authentication. If something breaks, the error log should tell you exactly which step failed.

A good sanity check is to change the query and confirm that retrieved document counts or prompt previews change accordingly. That tells you your observability hooks are capturing real runtime behavior, not just static prints.

Next Steps

  • Add OpenTelemetry spans around each component so traces show up in Grafana Tempo or Jaeger.
  • Send JSON logs to Loki or Elasticsearch instead of stdout.
  • Wrap this pattern into a reusable middleware layer for all your Haystack pipelines.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides