Haystack Tutorial (Python): rate limiting API calls for intermediate developers
This tutorial shows you how to throttle outbound API calls in a Haystack pipeline so your app stays inside vendor quotas and avoids 429s. You’ll wire a small rate limiter into a custom component, then use it before any expensive remote call.
What You'll Need
- •Python 3.10+
- •
haystack-ai - •
requests - •An API key for the remote service you want to call
- •A basic Haystack pipeline already working locally
- •A place to store secrets, like environment variables
Install the packages:
pip install haystack-ai requests
Set your API key before running anything:
export MY_API_KEY="your-key-here"
Step-by-Step
- •Start with a simple rate limiter that uses a token bucket. This gives you predictable throughput and is easy to reason about when your pipeline fans out into multiple calls.
import time
import threading
class TokenBucketRateLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.updated_at = time.monotonic()
self.lock = threading.Lock()
def acquire(self) -> None:
while True:
with self.lock:
now = time.monotonic()
elapsed = now - self.updated_at
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.updated_at = now
if self.tokens >= 1:
self.tokens -= 1
return
wait_time = (1 - self.tokens) / self.rate
time.sleep(wait_time)
- •Wrap the limiter in a Haystack component. The component takes an input payload, waits if needed, and passes the payload through unchanged so it can sit in front of any downstream API component.
from typing import Any, Dict
from haystack import component
@component
class RateLimitGate:
def __init__(self, rate: float, capacity: int):
self.limiter = TokenBucketRateLimiter(rate=rate, capacity=capacity)
@component.output_types(payload=Dict[str, Any])
def run(self, payload: Dict[str, Any]):
self.limiter.acquire()
return {"payload": payload}
- •Add a real outbound call after the gate. This example uses
requestsagainst httpbin so you can test safely without burning a real vendor quota.
import os
import requests
from haystack import component
@component
class HttpCaller:
@component.output_types(response=dict)
def run(self, payload: dict):
headers = {"Authorization": f"Bearer {os.environ.get('MY_API_KEY', '')}"}
response = requests.get(
"https://httpbin.org/get",
params=payload,
headers=headers,
timeout=10,
)
response.raise_for_status()
return {"response": response.json()}
- •Wire both components into a pipeline. The gate runs first, then the HTTP call happens only after the limiter allows it.
from haystack import Pipeline
pipe = Pipeline()
pipe.add_component("rate_limit", RateLimitGate(rate=2.0, capacity=2))
pipe.add_component("http_call", HttpCaller())
pipe.connect("rate_limit.payload", "http_call.payload")
result = pipe.run({"rate_limit": {"payload": {"query": "haystack"}}})
print(result["http_call"]["response"]["args"])
- •If you need per-endpoint limits, use one gate per upstream service. That keeps your OpenAI quota separate from your internal customer API quota and prevents one noisy path from starving another.
pipe = Pipeline()
pipe.add_component("llm_limit", RateLimitGate(rate=1.0, capacity=1))
pipe.add_component("crm_limit", RateLimitGate(rate=5.0, capacity=5))
pipe.add_component("llm_call", HttpCaller())
pipe.add_component("crm_call", HttpCaller())
pipe.connect("llm_limit.payload", "llm_call.payload")
pipe.connect("crm_limit.payload", "crm_call.payload")
Testing It
Run the pipeline in a loop and watch the timing between calls. With rate=2.0, you should get an initial burst of two requests and then roughly one request every 0.5 seconds after that.
You can confirm behavior by printing timestamps before and after pipe.run(). If you see 429 Too Many Requests from your real provider, lower the rate or reduce concurrency.
For production checks, add metrics around wait time and total request count. That tells you whether the limiter is protecting you or just hiding a larger throughput problem.
Next Steps
- •Add retry logic with exponential backoff for transient 429s and 5xx responses.
- •Move from a local token bucket to Redis if you need rate limits shared across multiple workers.
- •Combine this with Haystack async execution when you have many independent outbound calls to manage.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit