LlamaIndex Tutorial (Python): rate limiting API calls for advanced developers

By Cyprian AaronsUpdated 2026-04-21
llamaindexrate-limiting-api-calls-for-advanced-developerspython

This tutorial shows how to wrap LlamaIndex API calls with a real rate limiter so your app stops hammering OpenAI, Anthropic, or any other upstream provider. You need this when you’re running multi-user agents, batch jobs, or tool-heavy workflows where retries and concurrency can easily blow through vendor limits.

What You'll Need

  • Python 3.10+
  • llama-index
  • llama-index-llms-openai
  • llama-index-embeddings-openai
  • openai
  • tenacity
  • An OpenAI API key in OPENAI_API_KEY
  • Optional: Redis if you want distributed rate limiting later

Install the packages:

pip install llama-index llama-index-llms-openai llama-index-embeddings-openai openai tenacity

Step-by-Step

  1. Start by setting up LlamaIndex with a normal OpenAI LLM and embedding model. This gives you a baseline client that we’ll wrap with rate limiting instead of modifying LlamaIndex internals.
import os
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")

Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")

print("LlamaIndex configured")
  1. Add a token-bucket limiter for request pacing. This version is simple, thread-safe enough for local apps, and works well when you want to cap calls per second before they hit the provider.
import time
import threading

class TokenBucketRateLimiter:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.updated_at = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self) -> None:
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.updated_at
                self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
                self.updated_at = now

                if self.tokens >= 1:
                    self.tokens -= 1
                    return

                wait_time = (1 - self.tokens) / self.rate

            time.sleep(wait_time)
  1. Wrap the LLM call path so every completion request passes through the limiter. The important part is that your business code calls one function, not the raw provider client.
from llama_index.core.llms import ChatMessage

limiter = TokenBucketRateLimiter(rate=2.0, capacity=2)

def limited_chat(prompt: str) -> str:
    limiter.acquire()
    response = Settings.llm.chat([ChatMessage(role="user", content=prompt)])
    return response.message.content or ""

result = limited_chat("Write one sentence about why rate limiting matters.")
print(result)
  1. Apply the same pattern to embeddings, since vector indexing can trigger bursts of API calls. If you ingest documents in parallel without this guard, embeddings are usually the first place you hit limits.
from typing import List

def limited_embed(texts: List[str]) -> List[List[float]]:
    limiter.acquire()
    return Settings.embed_model.get_text_embedding_batch(texts)

vectors = limited_embed([
    "First document chunk",
    "Second document chunk",
])

print(len(vectors), len(vectors[0]))
  1. Add retry handling for transient 429s and server errors. Rate limiting reduces pressure, but retries still matter because providers can reject requests even when your client behaves well.
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type(Exception),
)
def robust_limited_chat(prompt: str) -> str:
    limiter.acquire()
    response = Settings.llm.chat([ChatMessage(role="user", content=prompt)])
    return response.message.content or ""

print(robust_limited_chat("Give me a short answer about backoff strategy."))
  1. If you want this in an index-backed workflow, use the same wrapper inside retrieval or synthesis code paths. The pattern stays the same: keep all provider access behind one boundary so you can enforce policy consistently.
from llama_index.core import VectorStoreIndex, Document

docs = [
    Document(text="LlamaIndex helps build retrieval applications."),
    Document(text="Rate limiting protects external APIs from overload."),
]

index = VectorStoreIndex.from_documents(docs)
query_engine = index.as_query_engine()

limiter.acquire()
response = query_engine.query("What protects external APIs from overload?")
print(response)

Testing It

Run the script and watch the output timing when you call limited_chat() several times in a loop. With rate=2.0 and capacity=2, the first two requests should go through immediately, then later calls should pause instead of spiking traffic.

To verify behavior under load, lower the rate to something obvious like rate=0.5 and make five calls back-to-back. You should see roughly two seconds between allowed requests after the initial burst.

If you’re using embeddings in ingestion jobs, print timestamps around limited_embed() and confirm it spaces out batch calls as expected. In production, add logs for wait time and request count so you can prove the limiter is doing real work.

Next Steps

  • Move the token bucket into Redis so multiple workers share one global limit.
  • Add separate limits per endpoint: chat completions, embeddings, tool calls.
  • Wire limiter metrics into Prometheus so you can track throttling before users feel it.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides