How to Integrate LangGraph for wealth management with Redis for production AI

By Cyprian AaronsUpdated 2026-04-21
langgraph-for-wealth-managementredisproduction-ai

Combining LangGraph for wealth management with Redis gives you a clean path to production-grade agent systems: LangGraph handles the multi-step decision flow, while Redis stores state, checkpoints, and fast session context. For wealth management use cases, that means you can keep client conversations, portfolio analysis steps, risk checks, and compliance decisions durable across retries and process restarts.

The practical win is simple: your agent can resume from where it left off, share state across workers, and support low-latency retrieval for high-volume advisory workflows.

Prerequisites

  • Python 3.10+
  • A running Redis instance
    • Local: redis-server
    • Managed: AWS ElastiCache, Azure Cache for Redis, or Redis Cloud
  • Installed packages:
    • langgraph
    • langchain-core
    • redis
    • python-dotenv if you want env-based config
  • A working LLM provider configured for your LangGraph nodes
  • Basic familiarity with:
    • StateGraph
    • checkpointing in LangGraph
    • Redis key/value operations

Integration Steps

  1. Install dependencies.
pip install langgraph langchain-core redis python-dotenv
  1. Define your graph state and connect Redis as the checkpoint store.

LangGraph uses a checkpointer to persist graph state between runs. For production, Redis is a good fit because it is fast and supports shared state across horizontally scaled workers.

import os
from typing import TypedDict, Annotated

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.redis import RedisSaver

class WealthState(TypedDict):
    client_id: str
    portfolio_value: float
    risk_score: int
    recommendation: str

def assess_risk(state: WealthState) -> WealthState:
    score = 80 if state["portfolio_value"] > 1_000_000 else 40
    return {**state, "risk_score": score}

def generate_recommendation(state: WealthState) -> WealthState:
    if state["risk_score"] >= 70:
        rec = "Recommend conservative allocation and liquidity review."
    else:
        rec = "Recommend balanced allocation with periodic rebalancing."
    return {**state, "recommendation": rec}

builder = StateGraph(WealthState)
builder.add_node("assess_risk", assess_risk)
builder.add_node("generate_recommendation", generate_recommendation)

builder.add_edge(START, "assess_risk")
builder.add_edge("assess_risk", "generate_recommendation")
builder.add_edge("generate_recommendation", END)

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
checkpointer = RedisSaver.from_conn_string(redis_url)

graph = builder.compile(checkpointer=checkpointer)
  1. Add a persistent thread ID so each client session resumes correctly.

LangGraph checkpoints are tied to a thread identifier. In wealth management systems, this lets you resume a client’s advisory flow after an interruption without losing context.

config = {
    "configurable": {
        "thread_id": "client-12345"
    }
}

initial_state = {
    "client_id": "client-12345",
    "portfolio_value": 2_500_000,
    "risk_score": 0,
    "recommendation": ""
}

result = graph.invoke(initial_state, config=config)
print(result)
  1. Store auxiliary session data in Redis for fast lookup.

Use Redis directly for things like recent interactions, cached portfolio snapshots, or compliance flags. Keep this separate from LangGraph checkpoints so your workflow state stays clean.

import redis

r = redis.Redis.from_url(redis_url, decode_responses=True)

session_key = f"wealth-session:{initial_state['client_id']}"
r.hset(session_key, mapping={
    "last_recommendation": result["recommendation"],
    "risk_score": str(result["risk_score"]),
})

cached = r.hgetall(session_key)
print(cached)
  1. Rehydrate the graph from persisted state on the next request.

If the same client comes back later, pull metadata from Redis and continue the workflow with the same thread ID.

saved_session = r.hgetall(session_key)

next_input = {
    "client_id": initial_state["client_id"],
    "portfolio_value": float(initial_state["portfolio_value"]),
    "risk_score": int(saved_session.get("risk_score", 0)),
    "recommendation": saved_session.get("last_recommendation", "")
}

resumed_result = graph.invoke(next_input, config=config)
print(resumed_result)

Testing the Integration

Run the graph once and confirm both checkpointing and Redis storage work.

test_config = {"configurable": {"thread_id": "test-thread-001"}}
test_state = {
    "client_id": "test-client",
    "portfolio_value": 750_000,
    "risk_score": 0,
    "recommendation": ""
}

output = graph.invoke(test_state, config=test_config)
print("LangGraph output:", output)

r.set("integration:test-client", output["recommendation"])
print("Redis value:", r.get("integration:test-client"))

Expected output:

LangGraph output: {'client_id': 'test-client', 'portfolio_value': 750000.0, 'risk_score': 40, 'recommendation': 'Recommend balanced allocation with periodic rebalancing.'}
Redis value: Recommend balanced allocation with periodic rebalancing.

Real-World Use Cases

  • Client advisory agents that keep long-running wealth planning sessions durable across retries and worker restarts.
  • Compliance-aware recommendation flows where each decision step is checkpointed and audit-friendly.
  • Portfolio review assistants that cache recent holdings snapshots in Redis for low-latency access during interactive sessions.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides