Haystack Tutorial (Python): rate limiting API calls for advanced developers

By Cyprian AaronsUpdated 2026-04-21
haystackrate-limiting-api-calls-for-advanced-developerspython

This tutorial shows how to rate limit outbound API calls in a Haystack pipeline using Python, so you can keep LLM and tool traffic under vendor quotas without dropping requests. You need this when multiple agents, retrievers, or tools can spike traffic and trigger 429s, throttling, or account suspension.

What You'll Need

  • Python 3.10+
  • haystack-ai
  • httpx
  • An API key for the external service you want to call
  • A basic Haystack pipeline already working
  • Optional: python-dotenv if you want to load secrets from a .env file

Install the packages:

pip install haystack-ai httpx python-dotenv

Step-by-Step

  1. First, define a small rate limiter that enforces a minimum interval between calls. This is the simplest production-safe pattern when you only need to protect one downstream API from bursts.
import time
import threading


class RateLimiter:
    def __init__(self, calls_per_second: float):
        self.min_interval = 1.0 / calls_per_second
        self._lock = threading.Lock()
        self._last_call = 0.0

    def wait(self) -> None:
        with self._lock:
            now = time.monotonic()
            elapsed = now - self._last_call
            sleep_for = max(0.0, self.min_interval - elapsed)
            if sleep_for > 0:
                time.sleep(sleep_for)
            self._last_call = time.monotonic()
  1. Next, wrap your API call in a Haystack component. In Haystack, components are just Python classes with @component.output_types, which makes them easy to compose inside pipelines.
import os
import httpx
from haystack import component


@component
class LimitedHttpCaller:
    def __init__(self, api_key: str, limiter: RateLimiter):
        self.api_key = api_key
        self.limiter = limiter

    @component.output_types(response=str)
    def run(self, url: str) -> dict:
        self.limiter.wait()
        headers = {"Authorization": f"Bearer {self.api_key}"}
        with httpx.Client(timeout=30.0) as client:
            resp = client.get(url, headers=headers)
            resp.raise_for_status()
        return {"response": resp.text}
  1. Now wire the component into a pipeline. This example uses a simple input generator so you can see the limiter working even before plugging it into a real agent flow.
from haystack import Pipeline, component


@component
class UrlSource:
    @component.output_types(url=str)
    def run(self) -> dict:
        return {"url": "https://httpbin.org/get"}


pipe = Pipeline()
pipe.add_component("source", UrlSource())
pipe.add_component(
    "caller",
    LimitedHttpCaller(
        api_key=os.environ.get("API_KEY", "demo-key"),
        limiter=RateLimiter(calls_per_second=2),
    ),
)
pipe.connect("source.url", "caller.url")

result = pipe.run({})
print(result["caller"]["response"][:200])
  1. If you need burst control across multiple concurrent tasks, guard the shared limiter with a lock and reuse the same instance everywhere that hits the vendor API. That gives you global throttling instead of per-component throttling, which is what usually matters in agent systems.
from concurrent.futures import ThreadPoolExecutor


limiter = RateLimiter(calls_per_second=2)
caller = LimitedHttpCaller(
    api_key=os.environ.get("API_KEY", "demo-key"),
    limiter=limiter,
)

def fetch_one(_: int) -> str:
    output = caller.run("https://httpbin.org/get")
    return output["response"]

with ThreadPoolExecutor(max_workers=5) as pool:
    responses = list(pool.map(fetch_one, range(5)))

print(len(responses))
  1. Finally, add retry logic for real-world 429s. Rate limiting reduces pressure; retries handle cases where the provider still rejects requests because of shared tenant limits or upstream contention.
import time


def get_with_retry(caller: LimitedHttpCaller, url: str, attempts: int = 3) -> str:
    for attempt in range(1, attempts + 1):
        try:
            return caller.run(url)["response"]
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code != 429 or attempt == attempts:
                raise
            time.sleep(2 ** attempt)


print(get_with_retry(caller, "https://httpbin.org/get"))

Testing It

Run the threaded example and watch how long five calls take with a calls_per_second=2 limiter. You should see total runtime increase instead of all requests firing at once.

To verify correctness under load, temporarily lower the rate to 1 and send several parallel requests; each request should block until the previous one has cleared the interval.

If your upstream API returns 429, confirm your retry path backs off instead of hammering the endpoint again immediately.

For production validation, log timestamps before and after limiter.wait() and compare them against your vendor quota window.

Next Steps

  • Add token-bucket rate limiting if you need controlled bursts instead of fixed spacing.
  • Move this pattern into a shared Haystack component library so every agent uses the same throttle policy.
  • Combine rate limiting with circuit breakers and structured retries for stricter vendor protection

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides