LangGraph Tutorial (Python): rate limiting API calls for advanced developers
This tutorial shows how to rate limit API calls inside a LangGraph workflow in Python without blocking the whole app or letting one bursty agent burn through your quota. You’d use this when your graph can fan out across multiple tools, hit third-party APIs with strict limits, or needs to stay within per-minute budgets for cost control and reliability.
What You'll Need
- •Python 3.10+
- •
langgraph - •
langchain-core - •
httpx - •An API key for the service you want to call
- •A
.envfile or environment variables for secrets - •Basic familiarity with LangGraph nodes, edges, and state
Install the packages:
pip install langgraph langchain-core httpx
Step-by-Step
- •Start by defining a shared rate limiter. The simplest production-friendly pattern is a token bucket guarded by a lock so concurrent graph runs do not exceed your quota.
import time
import threading
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.updated_at = time.monotonic()
self.lock = threading.Lock()
def acquire(self, tokens: int = 1) -> None:
while True:
with self.lock:
now = time.monotonic()
elapsed = now - self.updated_at
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.updated_at = now
if self.tokens >= tokens:
self.tokens -= tokens
return
wait_time = (tokens - self.tokens) / self.rate
time.sleep(wait_time)
- •Next, define your LangGraph state and a node that performs the API call through the limiter. This example uses
httpx, but the same pattern works for OpenAI, Anthropic, Stripe, internal REST services, or any SDK that makes outbound requests.
from typing import TypedDict
import httpx
class GraphState(TypedDict):
query: str
result: str
limiter = TokenBucket(rate=2.0, capacity=2)
def call_api(state: GraphState) -> GraphState:
limiter.acquire(1)
response = httpx.get(
"https://httpbin.org/get",
params={"q": state["query"]},
timeout=10.0,
)
response.raise_for_status()
return {"query": state["query"], "result": response.json()["args"]["q"]}
- •Build the graph with a single node first. Keeping the limiter outside the node state makes it shared across all invocations of the compiled graph, which is what you want when enforcing process-level quotas.
from langgraph.graph import StateGraph, START, END
builder = StateGraph(GraphState)
builder.add_node("call_api", call_api)
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
graph = builder.compile()
- •Run multiple requests and watch the limiter smooth them out. With a capacity of 2 and refill rate of 2 tokens per second, the first two calls go through immediately and later calls wait their turn.
if __name__ == "__main__":
inputs = [
{"query": "alpha", "result": ""},
{"query": "beta", "result": ""},
{"query": "gamma", "result": ""},
{"query": "delta", "result": ""},
]
start = time.perf_counter()
for item in inputs:
output = graph.invoke(item)
elapsed = time.perf_counter() - start
print(f"{elapsed:.2f}s -> {output['result']}")
- •If you need better observability, wrap the limiter with logging so you can see when calls are blocked and for how long. In production, this is how you catch bad fan-out patterns before they become an outage.
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("rate_limiter")
class LoggedTokenBucket(TokenBucket):
def acquire(self, tokens: int = 1) -> None:
while True:
with self.lock:
now = time.monotonic()
elapsed = now - self.updated_at
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.updated_at = now
if self.tokens >= tokens:
self.tokens -= tokens
logger.info("acquired=%s remaining=%.2f", tokens, self.tokens)
return
wait_time = (tokens - self.tokens) / self.rate
logger.info("waiting %.2fs for token", wait_time)
time.sleep(wait_time)
- •For multi-node graphs, call
limiter.acquire()in each node that touches an external dependency. If different APIs have different quotas, use one limiter per service instead of sharing a global bucket across everything.
def call_second_api(state: GraphState) -> GraphState:
limiter.acquire(1)
response = httpx.get(
"https://httpbin.org/uuid",
timeout=10.0,
)
response.raise_for_status()
return {
"query": state["query"],
"result": f"{state['result']} | {response.json()['uuid']}",
}
Testing It
Run the script and confirm that requests do not all fire at once after the first burst of tokens is consumed. You should see immediate responses for the first couple of invocations, then small delays as the bucket refills.
To test concurrency properly, invoke the graph from multiple threads or from an async wrapper around separate processes if your real workload is distributed. If two requests arrive at nearly the same time and only one token remains, only one should proceed immediately.
Also verify failure behavior by pointing the node at a slow or failing endpoint. The limiter should still protect your quota even when retries happen upstream in your HTTP client or tool layer.
Next Steps
- •Add per-user or per-tenant limiters using a key like
customer_id - •Combine this with exponential backoff and retry budgets for transient 429s
- •Move from process-local locking to Redis if you need rate limits shared across multiple workers
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit