Haystack Tutorial (Python): streaming agent responses for intermediate developers

By Cyprian AaronsUpdated 2026-04-21
haystackstreaming-agent-responses-for-intermediate-developerspython

This tutorial shows you how to stream agent responses from a Haystack pipeline in Python, token by token, instead of waiting for the full answer. You need this when you’re building chat UIs, support copilots, or internal tools where latency matters and users should see the model start responding immediately.

What You'll Need

  • Python 3.10+
  • haystack-ai
  • An OpenAI API key
  • A shell with environment variables set
  • Basic familiarity with Haystack Pipeline and ChatMessage

Install the package:

pip install haystack-ai

Set your API key:

export OPENAI_API_KEY="your-key-here"

Step-by-Step

  1. Start by creating a minimal generator component that calls an LLM through Haystack. For streaming, the important part is that the component returns an iterator of partial chunks instead of a single final string.
import os
from typing import List

from haystack import component
from haystack.dataclasses import ChatMessage
from haystack.components.generators.chat import OpenAIChatGenerator

@component
class StreamingResponder:
    def __init__(self):
        self.generator = OpenAIChatGenerator(
            model="gpt-4o-mini",
            streaming_callback=None,
        )

    @component.output_types(reply=str)
    def run(self, messages: List[ChatMessage]):
        result = self.generator.run(messages=messages)
        return {"reply": result["replies"][0].text}
  1. Next, wire it into a pipeline and send a simple user message. This gives you a baseline non-streaming response so you can confirm the model and credentials are working before you add event handling.
from haystack import Pipeline

pipe = Pipeline()
pipe.add_component("responder", StreamingResponder())

messages = [
    ChatMessage.from_system("You are a concise assistant."),
    ChatMessage.from_user("Explain what streaming means in one sentence."),
]

result = pipe.run(
    data={
        "responder": {
            "messages": messages
        }
    }
)

print(result["responder"]["reply"])
  1. Now switch to native streaming on the generator. In Haystack, the cleanest pattern is to provide a callback that receives each chunk as it arrives, then append those chunks to your own buffer.
chunks = []

def on_token(token):
    print(token, end="", flush=True)
    chunks.append(token)

streaming_generator = OpenAIChatGenerator(
    model="gpt-4o-mini",
    streaming_callback=on_token,
)

messages = [
    ChatMessage.from_system("You are a concise assistant."),
    ChatMessage.from_user("List three use cases for response streaming."),
]

response = streaming_generator.run(messages=messages)
print("\n\nFinal reply:")
print(response["replies"][0].text)
  1. If you want this inside a real agent flow, keep the callback at the edge and let your application own the stream buffer. That makes it easier to push tokens to WebSockets, Server-Sent Events, or a terminal UI without coupling presentation logic to your Haystack components.
from haystack.components.generators.chat import OpenAIChatGenerator

class TokenBuffer:
    def __init__(self):
        self.parts = []

    def write(self, token: str):
        self.parts.append(token)

    @property
    def text(self) -> str:
        return "".join(self.parts)

buffer = TokenBuffer()

generator = OpenAIChatGenerator(
    model="gpt-4o-mini",
    streaming_callback=buffer.write,
)

messages = [
    ChatMessage.from_user("Write a short status update about delayed payments."),
]

result = generator.run(messages=messages)
print(buffer.text)
print(result["replies"][0].text)
  1. Finally, wrap the pattern in a small helper so your app can reuse it across endpoints. This is the version you actually want in production: one place for prompt assembly, one place for buffering, and one place for transport-specific output.
def stream_chat(messages):
    emitted = []

    def collect(chunk: str):
        emitted.append(chunk)
        print(chunk, end="", flush=True)

    generator = OpenAIChatGenerator(
        model="gpt-4o-mini",
        streaming_callback=collect,
    )

    result = generator.run(messages=messages)
    return "".join(emitted), result["replies"][0].text


messages = [
    ChatMessage.from_system("You answer like an insurance operations analyst."),
    ChatMessage.from_user("Summarize why claim triage matters."),
]

streamed_text, final_text = stream_chat(messages)
print("\n---")
print(streamed_text == final_text)

Testing It

Run the script from your terminal and watch for partial output appearing before the final line prints. If streaming is working, you should see tokens arrive incrementally rather than all at once after a pause.

Also verify that streamed_text and final_text match at the end of execution. If they don’t, check whether your callback is collecting every chunk and whether anything in your app is buffering stdout.

If you get authentication errors, confirm OPENAI_API_KEY is set in the same shell session where you run Python. If you get import errors, make sure haystack-ai is installed in the active virtual environment.

Next Steps

  • Add Server-Sent Events or WebSocket delivery so streamed tokens reach your frontend in real time.
  • Wrap this pattern in an HTTP endpoint using FastAPI and return chunks as they arrive.
  • Extend the pipeline with retrieval so streamed answers are grounded in internal documents before generation starts.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides