How to Integrate LangGraph for wealth management with Kubernetes for RAG

By Cyprian AaronsUpdated 2026-04-21
langgraph-for-wealth-managementkubernetesrag

Wealth management RAG systems need two things that usually fight each other: strong orchestration and predictable infrastructure. LangGraph gives you the graph-based control flow for retrieval, review, and compliance checks, while Kubernetes gives you the operational layer to run those agents, retrievers, and vector services with proper scaling and isolation.

Prerequisites

  • Python 3.10+
  • A Kubernetes cluster with kubectl access
  • kubernetes Python client installed
  • LangGraph installed with your wealth-management agent package
  • Access to a vector store or document service for RAG
  • A namespace in Kubernetes for your agent workloads
  • Service account credentials or in-cluster RBAC configured

Install the Python dependencies:

pip install langgraph kubernetes pydantic requests

Integration Steps

  1. Define the LangGraph workflow for wealth management RAG

Start with a graph that takes a client query, retrieves portfolio or policy documents, then routes through a compliance check before generating the final answer.

from typing import TypedDict, List
from langgraph.graph import StateGraph, END

class WealthState(TypedDict):
    query: str
    retrieved_docs: List[str]
    compliance_passed: bool
    answer: str

def retrieve_docs(state: WealthState):
    query = state["query"]
    docs = [f"Retrieved wealth policy context for: {query}"]
    return {"retrieved_docs": docs}

def compliance_check(state: WealthState):
    docs = state["retrieved_docs"]
    passed = any("policy" in doc.lower() for doc in docs)
    return {"compliance_passed": passed}

def generate_answer(state: WealthState):
    if not state["compliance_passed"]:
        return {"answer": "Request blocked by compliance policy."}
    return {"answer": f"Answer based on {len(state['retrieved_docs'])} retrieved documents."}

graph = StateGraph(WealthState)
graph.add_node("retrieve_docs", retrieve_docs)
graph.add_node("compliance_check", compliance_check)
graph.add_node("generate_answer", generate_answer)

graph.set_entry_point("retrieve_docs")
graph.add_edge("retrieve_docs", "compliance_check")
graph.add_edge("compliance_check", "generate_answer")
graph.add_edge("generate_answer", END)

app = graph.compile()
  1. Use Kubernetes to discover your RAG services

In production, your retriever or vector DB will usually sit behind a Kubernetes Service. Use the official Kubernetes Python client to resolve the service endpoint before calling it from your LangGraph node.

from kubernetes import client, config

def get_vector_service_endpoint(namespace: str = "wealth-ai", service_name: str = "vector-db"):
    try:
        config.load_incluster_config()
    except Exception:
        config.load_kube_config()

    v1 = client.CoreV1Api()
    svc = v1.read_namespaced_service(service_name, namespace)
    ip = svc.spec.cluster_ip
    port = svc.spec.ports[0].port
    return f"http://{ip}:{port}"

endpoint = get_vector_service_endpoint()
print(endpoint)
  1. Wire the Kubernetes-backed retriever into LangGraph

Now connect the graph node to your actual RAG service. This example calls an HTTP retriever exposed through Kubernetes.

import requests

def retrieve_from_k8s_rag(state: WealthState):
    endpoint = get_vector_service_endpoint()
    resp = requests.post(
        f"{endpoint}/search",
        json={"query": state["query"], "top_k": 3},
        timeout=10,
    )
    resp.raise_for_status()
    results = resp.json()["results"]
    return {"retrieved_docs": [item["text"] for item in results]}

graph = StateGraph(WealthState)
graph.add_node("retrieve_docs", retrieve_from_k8s_rag)
graph.add_node("compliance_check", compliance_check)
graph.add_node("generate_answer", generate_answer)

graph.set_entry_point("retrieve_docs")
graph.add_edge("retrieve_docs", "compliance_check")
graph.add_edge("compliance_check", "generate_answer")
graph.add_edge("generate_answer", END)

app = graph.compile()
  1. Run the workflow from inside Kubernetes

For production use, deploy the agent as a container in the same cluster so it can talk to internal Services directly. The code below shows a simple invocation path you can put behind an API endpoint or job worker.

def run_wealth_rag(query: str):
    result = app.invoke({"query": query, "retrieved_docs": [], "compliance_passed": False, "answer": ""})
    return result["answer"]

if __name__ == "__main__":
    answer = run_wealth_rag("Can this client increase equity exposure?")
    print(answer)
  1. Deploy with a Kubernetes-native execution model

If you want stronger isolation per request or per tenant, wrap each run in a Job or use a worker deployment backed by a queue. Here’s a minimal example that creates a Job using the Kubernetes API.

from kubernetes import client

def create_agent_job(namespace="wealth-ai"):
    batch_v1 = client.BatchV1Api()

    job_manifest = client.V1Job(
        metadata=client.V1ObjectMeta(name="wealth-rag-agent"),
        spec=client.V1JobSpec(
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": "wealth-rag-agent"}),
                spec=client.V1PodSpec(
                    restart_policy="Never",
                    containers=[
                        client.V1Container(
                            name="agent",
                            image="your-registry/wealth-rag-agent:latest",
                            env=[client.V1EnvVar(name="KUBERNETES_SERVICE_HOST", value="true")],
                        )
                    ],
                ),
            ),
            backoff_limit=2,
        ),
    )

    return batch_v1.create_namespaced_job(namespace=namespace, body=job_manifest)

# create_agent_job()

Testing the Integration

Use a real query and verify that LangGraph completes the retrieval and compliance path while Kubernetes resolves the backing service.

result = app.invoke({
    "query": "What is the recommended allocation for moderate risk clients?",
    "retrieved_docs": [],
    "compliance_passed": False,
    "answer": ""
})

print(result["retrieved_docs"])
print(result["compliance_passed"])
print(result["answer"])

Expected output:

['Retrieved wealth policy context for: What is the recommended allocation for moderate risk clients?']
True
Answer based on 1 retrieved documents.

If your retriever is wired to a real Kubernetes-hosted vector service, you should see actual document snippets instead of placeholder text.

Real-World Use Cases

  • Advisor copilot with policy gating
    Build an agent that answers portfolio questions only after retrieving approved product disclosures and running compliance checks.

  • Client servicing automation
    Route inbound questions about retirement plans, account changes, or asset allocation through a graph that pulls live internal knowledge from services running on Kubernetes.

  • Tenant-isolated RAG for private banking
    Run separate namespaces per business unit or region so each wealth team gets isolated data paths, compute limits, and audit boundaries.


Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides