LangChain Tutorial (Python): rate limiting API calls for advanced developers

By Cyprian AaronsUpdated 2026-04-21
langchainrate-limiting-api-calls-for-advanced-developerspython

This tutorial shows you how to put a real rate limiter in front of LangChain API calls in Python, so your agent stops blowing through provider quotas and failing under load. You’d use this when you have bursty traffic, multiple workers, or expensive models where uncontrolled retries turn into wasted tokens and noisy incidents.

What You'll Need

  • Python 3.10+
  • langchain
  • langchain-openai
  • openai
  • httpx
  • An OpenAI API key set as OPENAI_API_KEY
  • Optional: Redis if you want distributed rate limiting across processes

Step-by-Step

  1. Install the packages and confirm your environment is wired correctly.
pip install langchain langchain-openai openai httpx
export OPENAI_API_KEY="your-key-here"
  1. Start with a simple token-bucket style limiter. This example uses a thread-safe in-memory limiter, which is enough for a single process or a local worker pool.
import time
import threading


class RateLimiter:
    def __init__(self, max_calls: int, period_seconds: float):
        self.max_calls = max_calls
        self.period_seconds = period_seconds
        self.calls = []
        self.lock = threading.Lock()

    def acquire(self) -> None:
        while True:
            with self.lock:
                now = time.monotonic()
                self.calls = [t for t in self.calls if now - t < self.period_seconds]
                if len(self.calls) < self.max_calls:
                    self.calls.append(now)
                    return
                sleep_for = self.period_seconds - (now - self.calls[0])

            time.sleep(max(sleep_for, 0.01))
  1. Wrap your LangChain LLM calls with the limiter. The important part is that the limiter sits outside the model call, so every request is gated before it hits the provider.
from langchain_openai import ChatOpenAI


limiter = RateLimiter(max_calls=3, period_seconds=10)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def limited_invoke(prompt: str) -> str:
    limiter.acquire()
    response = llm.invoke(prompt)
    return response.content


if __name__ == "__main__":
    for i in range(5):
        text = limited_invoke(f"Summarize order risk in one sentence. Request {i+1}")
        print(f"{i+1}: {text}")
  1. If you are using LangChain chains, apply the same wrapper at the edge of the chain execution. This keeps your business logic clean and makes rate control reusable across prompts, tools, and agents.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a concise banking assistant."),
    ("human", "{question}"),
])

chain = prompt | llm | StrOutputParser()

def limited_chain_invoke(question: str) -> str:
    limiter.acquire()
    return chain.invoke({"question": question})


if __name__ == "__main__":
    print(limited_chain_invoke("Explain why rate limiting matters for API reliability."))
  1. Add retry handling for transient provider errors, but keep retries bounded and respectful of your limiter. Retries should not bypass the throttle; they should wait their turn like any other call.
from openai import RateLimitError
import time


def safe_limited_invoke(prompt: str, retries: int = 3) -> str:
    for attempt in range(retries):
        limiter.acquire()
        try:
            return llm.invoke(prompt).content
        except RateLimitError:
            if attempt == retries - 1:
                raise
            time.sleep(2 ** attempt)


if __name__ == "__main__":
    print(safe_limited_invoke("Give me one sentence on claims triage automation."))
  1. For production systems with multiple workers or pods, move the counter out of memory. A Redis-backed limiter gives you shared enforcement across processes instead of each worker pretending it is alone.
import redis


class RedisRateLimiter:
    def __init__(self, client: redis.Redis, key: str, max_calls: int, period_seconds: int):
        self.client = client
        self.key = key
        self.max_calls = max_calls
        self.period_seconds = period_seconds

    def acquire(self) -> None:
        while True:
            now = int(time.time())
            window_start = now - self.period_seconds

            pipe = self.client.pipeline()
            pipe.zremrangebyscore(self.key, 0, window_start)
            pipe.zcard(self.key)
            _, count = pipe.execute()

            if count < self.max_calls:
                with self.client.pipeline() as p:
                    p.zadd(self.key, {str(now): now})
                    p.expire(self.key, self.period_seconds)
                    p.execute()
                return

            time.sleep(0.25)

Testing It

Run the script and watch the timestamps between requests; after three calls in ten seconds, the fourth should pause instead of failing immediately. If you want a hard check, print time.monotonic() before each invoke and confirm spacing matches your configured window.

Also test under concurrency with multiple threads or async workers to make sure only one process-level limiter exists per traffic domain. If you deploy on more than one container or VM, switch to Redis before calling it production-ready.

Next Steps

  • Add per-user and per-route quotas so one noisy tenant does not starve everyone else.
  • Move from fixed-window logic to sliding-window or token-bucket algorithms for smoother throughput.
  • Combine rate limiting with circuit breakers and structured retry policies for resilient agent pipelines

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides