How to Integrate LangGraph for wealth management with Redis for AI agents
Wealth-management agents need two things that usually fight each other: durable workflow state and fast, shared memory. LangGraph gives you the control flow for multi-step advisor logic, while Redis gives you low-latency state storage, session memory, and a clean way to coordinate agent runs across workers.
Prerequisites
- •Python 3.10+
- •A running Redis instance
- •Local:
redis-server - •Docker:
docker run -p 6379:6379 redis:7
- •Local:
- •LangGraph installed
- •Redis Python client installed
- •An LLM provider configured for your graph nodes
- •Basic familiarity with:
- •
StateGraph - •
MemorySaveror a custom checkpointer - •Redis key/value operations
- •
Install the packages:
pip install langgraph redis langchain-openai
Integration Steps
- •
Define the wealth-management state
Keep the graph state explicit. For wealth workflows, I usually track client profile, portfolio summary, risk score, and the latest recommendation.
from typing import TypedDict, Annotated
from operator import add
class WealthState(TypedDict):
client_id: str
profile: dict
portfolio: dict
risk_score: float
recommendation: str
messages: Annotated[list, add]
- •
Create LangGraph nodes for analysis and recommendation
Build small nodes that do one job each. In production, this makes it easier to test suitability checks, allocation logic, and compliance gates separately.
from langgraph.graph import StateGraph, START, END
def load_client_profile(state: WealthState):
# Replace with CRM / core banking lookup
profile = {
"age": 42,
"goal": "retirement",
"risk_tolerance": "moderate",
"jurisdiction": "US"
}
return {"profile": profile}
def assess_portfolio(state: WealthState):
# Replace with portfolio service call
portfolio = {
"equities_pct": 60,
"bonds_pct": 30,
"cash_pct": 10,
"aum_usd": 250000
}
risk_score = 0.58
return {"portfolio": portfolio, "risk_score": risk_score}
def generate_recommendation(state: WealthState):
if state["risk_score"] > 0.7:
rec = "Reduce equity exposure and increase fixed income allocation."
else:
rec = "Portfolio is aligned with moderate risk tolerance."
return {"recommendation": rec}
graph = StateGraph(WealthState)
graph.add_node("load_client_profile", load_client_profile)
graph.add_node("assess_portfolio", assess_portfolio)
graph.add_node("generate_recommendation", generate_recommendation)
graph.add_edge(START, "load_client_profile")
graph.add_edge("load_client_profile", "assess_portfolio")
graph.add_edge("assess_portfolio", "generate_recommendation")
graph.add_edge("generate_recommendation", END)
- •
Use Redis as the shared persistence layer
Redis works well for session state and cross-process coordination. In an agent system, I use it for conversation context, execution markers, and short-lived cached financial data.
import json
import redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
client_id = "client_123"
# Store a lightweight session snapshot
r.set(
f"wealth:{client_id}:session",
json.dumps({
"last_run_status": "started",
"workflow": "portfolio_review"
}),
ex=3600,
)
# Store cached market data or derived features
r.hset(
f"wealth:{client_id}:portfolio_cache",
mapping={
"equities_pct": 60,
"bonds_pct": 30,
"cash_pct": 10,
}
)
- •
Connect LangGraph execution to Redis-backed state
The clean pattern is: read from Redis before invoking the graph, then write graph outputs back after execution. If you want durable thread-level checkpoints across runs, LangGraph also supports checkpointing via
MemorySaveror a custom saver implementation.
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
thread_id = f"wealth-{client_id}"
# Load prior context from Redis if needed
cached_portfolio = r.hgetall(f"wealth:{client_id}:portfolio_cache")
initial_state: WealthState = {
"client_id": client_id,
"profile": {},
"portfolio": dict(cached_portfolio),
"risk_score": 0.0,
"recommendation": "",
"messages": []
}
result = app.invoke(
initial_state,
config={"configurable": {"thread_id": thread_id}}
)
# Persist result to Redis for other services/agents to consume
r.set(f"wealth:{client_id}:latest_recommendation", result["recommendation"])
r.hset(
f"wealth:{client_id}:latest_result",
mapping={
"risk_score": str(result["risk_score"]),
"recommendation": result["recommendation"]
}
)
- •
Add a Redis-backed guardrail for multi-agent coordination
In wealth management systems, you often have one agent drafting advice and another validating compliance. Use Redis locks or status keys so two workers don’t process the same client simultaneously.
lock_key = f"wealth:{client_id}:lock"
if r.set(lock_key, thread_id, nx=True, ex=120):
try:
final_result = app.invoke(initial_state, config={"configurable": {"thread_id": thread_id}})
r.set(f"wealth:{client_id}:status", "completed")
finally:
current_owner = r.get(lock_key)
if current_owner == thread_id:
r.delete(lock_key)
else:
raise RuntimeError(f"Client {client_id} is already being processed by another worker")
Testing the Integration
Run a simple end-to-end test that verifies LangGraph returns a recommendation and Redis stores it.
test_result = app.invoke(
{
"client_id": "client_123",
"profile": {},
"portfolio": {},
"risk_score": 0.0,
"recommendation": "",
"messages": []
},
config={"configurable": {"thread_id": f"wealth-client_123"}}
)
saved_rec = r.get("wealth:client_123:latest_recommendation")
print("LangGraph recommendation:", test_result["recommendation"])
print("Redis saved recommendation:", saved_rec)
Expected output:
LangGraph recommendation: Portfolio is aligned with moderate risk tolerance.
Redis saved recommendation: Portfolio is aligned with moderate risk tolerance.
Real-World Use Cases
- •
Advisor copilot
- •Run a LangGraph workflow that pulls client data, checks suitability rules, drafts an investment summary, then stores the final note in Redis for CRM sync.
- •
Portfolio review orchestration
- •Use LangGraph for stepwise analysis and Redis to cache market snapshots so multiple agents can reuse the same pricing data during a review window.
- •
Compliance-aware escalation
- •Have one node generate advice and another node validate against policy thresholds; use Redis to track approval status and prevent duplicate escalations.
If you’re building this for production, keep LangGraph focused on workflow logic and let Redis handle ephemeral coordination plus shared session state. That separation keeps your agent system predictable when volume rises and multiple workers start touching the same client record.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit