LangGraph Tutorial (Python): rate limiting API calls for advanced developers

By Cyprian AaronsUpdated 2026-04-22
langgraphrate-limiting-api-calls-for-advanced-developerspython

This tutorial shows how to rate limit API calls inside a LangGraph workflow in Python without blocking the whole app or letting one bursty agent burn through your quota. You’d use this when your graph can fan out across multiple tools, hit third-party APIs with strict limits, or needs to stay within per-minute budgets for cost control and reliability.

What You'll Need

  • Python 3.10+
  • langgraph
  • langchain-core
  • httpx
  • An API key for the service you want to call
  • A .env file or environment variables for secrets
  • Basic familiarity with LangGraph nodes, edges, and state

Install the packages:

pip install langgraph langchain-core httpx

Step-by-Step

  1. Start by defining a shared rate limiter. The simplest production-friendly pattern is a token bucket guarded by a lock so concurrent graph runs do not exceed your quota.
import time
import threading


class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.updated_at = time.monotonic()
        self.lock = threading.Lock()

    def acquire(self, tokens: int = 1) -> None:
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.updated_at
                self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
                self.updated_at = now

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return

                wait_time = (tokens - self.tokens) / self.rate

            time.sleep(wait_time)
  1. Next, define your LangGraph state and a node that performs the API call through the limiter. This example uses httpx, but the same pattern works for OpenAI, Anthropic, Stripe, internal REST services, or any SDK that makes outbound requests.
from typing import TypedDict

import httpx


class GraphState(TypedDict):
    query: str
    result: str


limiter = TokenBucket(rate=2.0, capacity=2)


def call_api(state: GraphState) -> GraphState:
    limiter.acquire(1)

    response = httpx.get(
        "https://httpbin.org/get",
        params={"q": state["query"]},
        timeout=10.0,
    )
    response.raise_for_status()

    return {"query": state["query"], "result": response.json()["args"]["q"]}
  1. Build the graph with a single node first. Keeping the limiter outside the node state makes it shared across all invocations of the compiled graph, which is what you want when enforcing process-level quotas.
from langgraph.graph import StateGraph, START, END


builder = StateGraph(GraphState)
builder.add_node("call_api", call_api)
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

graph = builder.compile()
  1. Run multiple requests and watch the limiter smooth them out. With a capacity of 2 and refill rate of 2 tokens per second, the first two calls go through immediately and later calls wait their turn.
if __name__ == "__main__":
    inputs = [
        {"query": "alpha", "result": ""},
        {"query": "beta", "result": ""},
        {"query": "gamma", "result": ""},
        {"query": "delta", "result": ""},
    ]

    start = time.perf_counter()
    for item in inputs:
        output = graph.invoke(item)
        elapsed = time.perf_counter() - start
        print(f"{elapsed:.2f}s -> {output['result']}")
  1. If you need better observability, wrap the limiter with logging so you can see when calls are blocked and for how long. In production, this is how you catch bad fan-out patterns before they become an outage.
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("rate_limiter")


class LoggedTokenBucket(TokenBucket):
    def acquire(self, tokens: int = 1) -> None:
        while True:
            with self.lock:
                now = time.monotonic()
                elapsed = now - self.updated_at
                self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
                self.updated_at = now

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    logger.info("acquired=%s remaining=%.2f", tokens, self.tokens)
                    return

                wait_time = (tokens - self.tokens) / self.rate
                logger.info("waiting %.2fs for token", wait_time)

            time.sleep(wait_time)
  1. For multi-node graphs, call limiter.acquire() in each node that touches an external dependency. If different APIs have different quotas, use one limiter per service instead of sharing a global bucket across everything.
def call_second_api(state: GraphState) -> GraphState:
    limiter.acquire(1)

    response = httpx.get(
        "https://httpbin.org/uuid",
        timeout=10.0,
    )
    response.raise_for_status()

    return {
        "query": state["query"],
        "result": f"{state['result']} | {response.json()['uuid']}",
    }

Testing It

Run the script and confirm that requests do not all fire at once after the first burst of tokens is consumed. You should see immediate responses for the first couple of invocations, then small delays as the bucket refills.

To test concurrency properly, invoke the graph from multiple threads or from an async wrapper around separate processes if your real workload is distributed. If two requests arrive at nearly the same time and only one token remains, only one should proceed immediately.

Also verify failure behavior by pointing the node at a slow or failing endpoint. The limiter should still protect your quota even when retries happen upstream in your HTTP client or tool layer.

Next Steps

  • Add per-user or per-tenant limiters using a key like customer_id
  • Combine this with exponential backoff and retry budgets for transient 429s
  • Move from process-local locking to Redis if you need rate limits shared across multiple workers

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides