How to Integrate LangGraph for wealth management with Redis for production AI
Combining LangGraph for wealth management with Redis gives you a clean path to production-grade agent systems: LangGraph handles the multi-step decision flow, while Redis stores state, checkpoints, and fast session context. For wealth management use cases, that means you can keep client conversations, portfolio analysis steps, risk checks, and compliance decisions durable across retries and process restarts.
The practical win is simple: your agent can resume from where it left off, share state across workers, and support low-latency retrieval for high-volume advisory workflows.
Prerequisites
- •Python 3.10+
- •A running Redis instance
- •Local:
redis-server - •Managed: AWS ElastiCache, Azure Cache for Redis, or Redis Cloud
- •Local:
- •Installed packages:
- •
langgraph - •
langchain-core - •
redis - •
python-dotenvif you want env-based config
- •
- •A working LLM provider configured for your LangGraph nodes
- •Basic familiarity with:
- •
StateGraph - •checkpointing in LangGraph
- •Redis key/value operations
- •
Integration Steps
- •Install dependencies.
pip install langgraph langchain-core redis python-dotenv
- •Define your graph state and connect Redis as the checkpoint store.
LangGraph uses a checkpointer to persist graph state between runs. For production, Redis is a good fit because it is fast and supports shared state across horizontally scaled workers.
import os
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.redis import RedisSaver
class WealthState(TypedDict):
client_id: str
portfolio_value: float
risk_score: int
recommendation: str
def assess_risk(state: WealthState) -> WealthState:
score = 80 if state["portfolio_value"] > 1_000_000 else 40
return {**state, "risk_score": score}
def generate_recommendation(state: WealthState) -> WealthState:
if state["risk_score"] >= 70:
rec = "Recommend conservative allocation and liquidity review."
else:
rec = "Recommend balanced allocation with periodic rebalancing."
return {**state, "recommendation": rec}
builder = StateGraph(WealthState)
builder.add_node("assess_risk", assess_risk)
builder.add_node("generate_recommendation", generate_recommendation)
builder.add_edge(START, "assess_risk")
builder.add_edge("assess_risk", "generate_recommendation")
builder.add_edge("generate_recommendation", END)
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
checkpointer = RedisSaver.from_conn_string(redis_url)
graph = builder.compile(checkpointer=checkpointer)
- •Add a persistent thread ID so each client session resumes correctly.
LangGraph checkpoints are tied to a thread identifier. In wealth management systems, this lets you resume a client’s advisory flow after an interruption without losing context.
config = {
"configurable": {
"thread_id": "client-12345"
}
}
initial_state = {
"client_id": "client-12345",
"portfolio_value": 2_500_000,
"risk_score": 0,
"recommendation": ""
}
result = graph.invoke(initial_state, config=config)
print(result)
- •Store auxiliary session data in Redis for fast lookup.
Use Redis directly for things like recent interactions, cached portfolio snapshots, or compliance flags. Keep this separate from LangGraph checkpoints so your workflow state stays clean.
import redis
r = redis.Redis.from_url(redis_url, decode_responses=True)
session_key = f"wealth-session:{initial_state['client_id']}"
r.hset(session_key, mapping={
"last_recommendation": result["recommendation"],
"risk_score": str(result["risk_score"]),
})
cached = r.hgetall(session_key)
print(cached)
- •Rehydrate the graph from persisted state on the next request.
If the same client comes back later, pull metadata from Redis and continue the workflow with the same thread ID.
saved_session = r.hgetall(session_key)
next_input = {
"client_id": initial_state["client_id"],
"portfolio_value": float(initial_state["portfolio_value"]),
"risk_score": int(saved_session.get("risk_score", 0)),
"recommendation": saved_session.get("last_recommendation", "")
}
resumed_result = graph.invoke(next_input, config=config)
print(resumed_result)
Testing the Integration
Run the graph once and confirm both checkpointing and Redis storage work.
test_config = {"configurable": {"thread_id": "test-thread-001"}}
test_state = {
"client_id": "test-client",
"portfolio_value": 750_000,
"risk_score": 0,
"recommendation": ""
}
output = graph.invoke(test_state, config=test_config)
print("LangGraph output:", output)
r.set("integration:test-client", output["recommendation"])
print("Redis value:", r.get("integration:test-client"))
Expected output:
LangGraph output: {'client_id': 'test-client', 'portfolio_value': 750000.0, 'risk_score': 40, 'recommendation': 'Recommend balanced allocation with periodic rebalancing.'}
Redis value: Recommend balanced allocation with periodic rebalancing.
Real-World Use Cases
- •Client advisory agents that keep long-running wealth planning sessions durable across retries and worker restarts.
- •Compliance-aware recommendation flows where each decision step is checkpointed and audit-friendly.
- •Portfolio review assistants that cache recent holdings snapshots in Redis for low-latency access during interactive sessions.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit