LangChain Tutorial (Python): rate limiting API calls for advanced developers
This tutorial shows you how to put a real rate limiter in front of LangChain API calls in Python, so your agent stops blowing through provider quotas and failing under load. You’d use this when you have bursty traffic, multiple workers, or expensive models where uncontrolled retries turn into wasted tokens and noisy incidents.
What You'll Need
- •Python 3.10+
- •
langchain - •
langchain-openai - •
openai - •
httpx - •An OpenAI API key set as
OPENAI_API_KEY - •Optional: Redis if you want distributed rate limiting across processes
Step-by-Step
- •Install the packages and confirm your environment is wired correctly.
pip install langchain langchain-openai openai httpx
export OPENAI_API_KEY="your-key-here"
- •Start with a simple token-bucket style limiter. This example uses a thread-safe in-memory limiter, which is enough for a single process or a local worker pool.
import time
import threading
class RateLimiter:
def __init__(self, max_calls: int, period_seconds: float):
self.max_calls = max_calls
self.period_seconds = period_seconds
self.calls = []
self.lock = threading.Lock()
def acquire(self) -> None:
while True:
with self.lock:
now = time.monotonic()
self.calls = [t for t in self.calls if now - t < self.period_seconds]
if len(self.calls) < self.max_calls:
self.calls.append(now)
return
sleep_for = self.period_seconds - (now - self.calls[0])
time.sleep(max(sleep_for, 0.01))
- •Wrap your LangChain LLM calls with the limiter. The important part is that the limiter sits outside the model call, so every request is gated before it hits the provider.
from langchain_openai import ChatOpenAI
limiter = RateLimiter(max_calls=3, period_seconds=10)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def limited_invoke(prompt: str) -> str:
limiter.acquire()
response = llm.invoke(prompt)
return response.content
if __name__ == "__main__":
for i in range(5):
text = limited_invoke(f"Summarize order risk in one sentence. Request {i+1}")
print(f"{i+1}: {text}")
- •If you are using LangChain chains, apply the same wrapper at the edge of the chain execution. This keeps your business logic clean and makes rate control reusable across prompts, tools, and agents.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages([
("system", "You are a concise banking assistant."),
("human", "{question}"),
])
chain = prompt | llm | StrOutputParser()
def limited_chain_invoke(question: str) -> str:
limiter.acquire()
return chain.invoke({"question": question})
if __name__ == "__main__":
print(limited_chain_invoke("Explain why rate limiting matters for API reliability."))
- •Add retry handling for transient provider errors, but keep retries bounded and respectful of your limiter. Retries should not bypass the throttle; they should wait their turn like any other call.
from openai import RateLimitError
import time
def safe_limited_invoke(prompt: str, retries: int = 3) -> str:
for attempt in range(retries):
limiter.acquire()
try:
return llm.invoke(prompt).content
except RateLimitError:
if attempt == retries - 1:
raise
time.sleep(2 ** attempt)
if __name__ == "__main__":
print(safe_limited_invoke("Give me one sentence on claims triage automation."))
- •For production systems with multiple workers or pods, move the counter out of memory. A Redis-backed limiter gives you shared enforcement across processes instead of each worker pretending it is alone.
import redis
class RedisRateLimiter:
def __init__(self, client: redis.Redis, key: str, max_calls: int, period_seconds: int):
self.client = client
self.key = key
self.max_calls = max_calls
self.period_seconds = period_seconds
def acquire(self) -> None:
while True:
now = int(time.time())
window_start = now - self.period_seconds
pipe = self.client.pipeline()
pipe.zremrangebyscore(self.key, 0, window_start)
pipe.zcard(self.key)
_, count = pipe.execute()
if count < self.max_calls:
with self.client.pipeline() as p:
p.zadd(self.key, {str(now): now})
p.expire(self.key, self.period_seconds)
p.execute()
return
time.sleep(0.25)
Testing It
Run the script and watch the timestamps between requests; after three calls in ten seconds, the fourth should pause instead of failing immediately. If you want a hard check, print time.monotonic() before each invoke and confirm spacing matches your configured window.
Also test under concurrency with multiple threads or async workers to make sure only one process-level limiter exists per traffic domain. If you deploy on more than one container or VM, switch to Redis before calling it production-ready.
Next Steps
- •Add per-user and per-route quotas so one noisy tenant does not starve everyone else.
- •Move from fixed-window logic to sliding-window or token-bucket algorithms for smoother throughput.
- •Combine rate limiting with circuit breakers and structured retry policies for resilient agent pipelines
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit