How to Integrate LangGraph for wealth management with Kubernetes for RAG
Wealth management RAG systems need two things that usually fight each other: strong orchestration and predictable infrastructure. LangGraph gives you the graph-based control flow for retrieval, review, and compliance checks, while Kubernetes gives you the operational layer to run those agents, retrievers, and vector services with proper scaling and isolation.
Prerequisites
- •Python 3.10+
- •A Kubernetes cluster with
kubectlaccess - •
kubernetesPython client installed - •LangGraph installed with your wealth-management agent package
- •Access to a vector store or document service for RAG
- •A namespace in Kubernetes for your agent workloads
- •Service account credentials or in-cluster RBAC configured
Install the Python dependencies:
pip install langgraph kubernetes pydantic requests
Integration Steps
- •Define the LangGraph workflow for wealth management RAG
Start with a graph that takes a client query, retrieves portfolio or policy documents, then routes through a compliance check before generating the final answer.
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
class WealthState(TypedDict):
query: str
retrieved_docs: List[str]
compliance_passed: bool
answer: str
def retrieve_docs(state: WealthState):
query = state["query"]
docs = [f"Retrieved wealth policy context for: {query}"]
return {"retrieved_docs": docs}
def compliance_check(state: WealthState):
docs = state["retrieved_docs"]
passed = any("policy" in doc.lower() for doc in docs)
return {"compliance_passed": passed}
def generate_answer(state: WealthState):
if not state["compliance_passed"]:
return {"answer": "Request blocked by compliance policy."}
return {"answer": f"Answer based on {len(state['retrieved_docs'])} retrieved documents."}
graph = StateGraph(WealthState)
graph.add_node("retrieve_docs", retrieve_docs)
graph.add_node("compliance_check", compliance_check)
graph.add_node("generate_answer", generate_answer)
graph.set_entry_point("retrieve_docs")
graph.add_edge("retrieve_docs", "compliance_check")
graph.add_edge("compliance_check", "generate_answer")
graph.add_edge("generate_answer", END)
app = graph.compile()
- •Use Kubernetes to discover your RAG services
In production, your retriever or vector DB will usually sit behind a Kubernetes Service. Use the official Kubernetes Python client to resolve the service endpoint before calling it from your LangGraph node.
from kubernetes import client, config
def get_vector_service_endpoint(namespace: str = "wealth-ai", service_name: str = "vector-db"):
try:
config.load_incluster_config()
except Exception:
config.load_kube_config()
v1 = client.CoreV1Api()
svc = v1.read_namespaced_service(service_name, namespace)
ip = svc.spec.cluster_ip
port = svc.spec.ports[0].port
return f"http://{ip}:{port}"
endpoint = get_vector_service_endpoint()
print(endpoint)
- •Wire the Kubernetes-backed retriever into LangGraph
Now connect the graph node to your actual RAG service. This example calls an HTTP retriever exposed through Kubernetes.
import requests
def retrieve_from_k8s_rag(state: WealthState):
endpoint = get_vector_service_endpoint()
resp = requests.post(
f"{endpoint}/search",
json={"query": state["query"], "top_k": 3},
timeout=10,
)
resp.raise_for_status()
results = resp.json()["results"]
return {"retrieved_docs": [item["text"] for item in results]}
graph = StateGraph(WealthState)
graph.add_node("retrieve_docs", retrieve_from_k8s_rag)
graph.add_node("compliance_check", compliance_check)
graph.add_node("generate_answer", generate_answer)
graph.set_entry_point("retrieve_docs")
graph.add_edge("retrieve_docs", "compliance_check")
graph.add_edge("compliance_check", "generate_answer")
graph.add_edge("generate_answer", END)
app = graph.compile()
- •Run the workflow from inside Kubernetes
For production use, deploy the agent as a container in the same cluster so it can talk to internal Services directly. The code below shows a simple invocation path you can put behind an API endpoint or job worker.
def run_wealth_rag(query: str):
result = app.invoke({"query": query, "retrieved_docs": [], "compliance_passed": False, "answer": ""})
return result["answer"]
if __name__ == "__main__":
answer = run_wealth_rag("Can this client increase equity exposure?")
print(answer)
- •Deploy with a Kubernetes-native execution model
If you want stronger isolation per request or per tenant, wrap each run in a Job or use a worker deployment backed by a queue. Here’s a minimal example that creates a Job using the Kubernetes API.
from kubernetes import client
def create_agent_job(namespace="wealth-ai"):
batch_v1 = client.BatchV1Api()
job_manifest = client.V1Job(
metadata=client.V1ObjectMeta(name="wealth-rag-agent"),
spec=client.V1JobSpec(
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "wealth-rag-agent"}),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[
client.V1Container(
name="agent",
image="your-registry/wealth-rag-agent:latest",
env=[client.V1EnvVar(name="KUBERNETES_SERVICE_HOST", value="true")],
)
],
),
),
backoff_limit=2,
),
)
return batch_v1.create_namespaced_job(namespace=namespace, body=job_manifest)
# create_agent_job()
Testing the Integration
Use a real query and verify that LangGraph completes the retrieval and compliance path while Kubernetes resolves the backing service.
result = app.invoke({
"query": "What is the recommended allocation for moderate risk clients?",
"retrieved_docs": [],
"compliance_passed": False,
"answer": ""
})
print(result["retrieved_docs"])
print(result["compliance_passed"])
print(result["answer"])
Expected output:
['Retrieved wealth policy context for: What is the recommended allocation for moderate risk clients?']
True
Answer based on 1 retrieved documents.
If your retriever is wired to a real Kubernetes-hosted vector service, you should see actual document snippets instead of placeholder text.
Real-World Use Cases
- •
Advisor copilot with policy gating
Build an agent that answers portfolio questions only after retrieving approved product disclosures and running compliance checks. - •
Client servicing automation
Route inbound questions about retirement plans, account changes, or asset allocation through a graph that pulls live internal knowledge from services running on Kubernetes. - •
Tenant-isolated RAG for private banking
Run separate namespaces per business unit or region so each wealth team gets isolated data paths, compute limits, and audit boundaries.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit