How to Integrate LangGraph for pension funds with Kubernetes for RAG
Combining LangGraph for pension funds with Kubernetes gives you a clean way to run retrieval-augmented generation workflows that are stateful, auditable, and horizontally scalable. For pension fund operations, that matters because you often need policy-grounded answers over large document sets, with controlled execution, retries, and deployment isolation.
Prerequisites
- •Python 3.10+
- •A Kubernetes cluster:
- •local:
kind,minikube, ork3d - •managed: EKS, GKE, or AKS
- •local:
- •
kubectlconfigured and pointing at your cluster - •A container registry for pushing images
- •Access to your pension fund knowledge sources:
- •policy PDFs
- •investment committee notes
- •member communications
- •actuarial or compliance documents
- •API access for your LLM and embedding provider
- •Python packages:
- •
langgraph - •
langchain - •
langchain-openaior your model provider package - •
kubernetes - •
faiss-cpuor another vector store client
- •
Install the core dependencies:
pip install langgraph langchain langchain-openai kubernetes faiss-cpu pydantic
Integration Steps
- •Build a LangGraph workflow for pension-fund RAG.
You want the graph to do three things: retrieve relevant context, generate an answer, and persist state between steps. LangGraph’s StateGraph is the right entry point.
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
class RAGState(TypedDict):
question: str
context: List[str]
answer: str
def retrieve(state: RAGState) -> RAGState:
# Replace with your vector search over pension fund docs
docs = [
"Pension scheme withdrawals are subject to trustee approval.",
"Investment policy statement requires quarterly risk review."
]
return {**state, "context": docs}
def generate(state: RAGState) -> RAGState:
prompt = f"""
Question: {state["question"]}
Context: {state["context"]}
Answer as a compliance-safe pension operations assistant.
"""
# Replace with ChatOpenAI / your model call
answer = "Withdrawals require trustee approval and must follow scheme rules."
return {**state, "answer": answer}
graph = StateGraph(RAGState)
graph.add_node("retrieve", retrieve)
graph.add_node("generate", generate)
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", END)
app = graph.compile()
- •Add Kubernetes-aware configuration so the graph can run inside pods.
In production you should not hardcode endpoints or secrets. Use Kubernetes environment variables and mounted secrets for model keys and vector store credentials.
import os
from kubernetes import client, config
# In-cluster when running inside a pod; local fallback for dev.
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace="default")
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
PINECONE_INDEX = os.getenv("PINECONE_INDEX", "pension-fund-rag")
NAMESPACE = os.getenv("POD_NAMESPACE", "default")
print(f"Running in namespace={NAMESPACE}, pods={len(pods.items)}")
This is the bridge between orchestration and application logic. Kubernetes owns runtime placement; LangGraph owns the workflow state machine.
- •Wire retrieval to a real vector backend and expose it through the graph.
For production RAG, replace dummy context with actual retrieval from your index. The pattern below shows how to keep the graph code clean while pulling from external storage.
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
# Load an existing FAISS index from disk or PVC-mounted volume.
vectorstore = FAISS.load_local(
folder_path="/mnt/pension-index",
embeddings=embeddings,
allow_dangerous_deserialization=True,
)
def retrieve(state: RAGState) -> RAGState:
docs = vectorstore.similarity_search(state["question"], k=4)
context = [doc.page_content for doc in docs]
return {**state, "context": context}
If you are using a managed vector DB instead of FAISS, keep the same LangGraph node shape and swap only the retrieval implementation. That keeps your graph portable across environments.
- •Package the graph as a service and deploy it on Kubernetes.
Run the graph behind an API so other services can call it. A simple FastAPI wrapper works well in a cluster.
from fastapi import FastAPI
from pydantic import BaseModel
class Query(BaseModel):
question: str
api = FastAPI()
@api.post("/rag")
def rag(query: Query):
result = app.invoke({"question": query.question, "context": [], "answer": ""})
return {"answer": result["answer"], "context": result["context"]}
Then deploy it with a standard Kubernetes manifest:
apiVersion: apps/v1
kind: Deployment
metadata:
name: pension-rag-agent
spec:
replicas: 2
selector:
matchLabels:
app: pension-rag-agent
template:
metadata:
labels:
app: pension-rag-agent
spec:
containers:
- name: api
image: your-registry/pension-rag-agent:latest
ports:
- containerPort: 8000
envFrom:
- secretRef:
name: pension-rag-secrets
---
apiVersion: v1
kind: Service
metadata:
name: pension-rag-agent-svc
spec:
selector:
app: pension-rag-agent
ports:
- port: 80
targetPort: 8000
- •Use Kubernetes Jobs for ingestion and indexing.
Do not rebuild indexes inside the request path. Run document ingestion as a Job so your LangGraph runtime stays fast.
from kubernetes import client
batch_v1 = client.BatchV1Api()
job = client.V1Job(
metadata=client.V1ObjectMeta(name="pension-indexer"),
spec=client.V1JobSpec(
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "pension-indexer"}),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[
client.V1Container(
name="indexer",
image="your-registry/pension-indexer:latest",
env=[client.V1EnvVar(name="OPENAI_API_KEY", value_from=client.V1EnvVarSource())],
)
],
),
)
),
)
batch_v1.create_namespaced_job(namespace="default", body=job)
Testing the Integration
Use an end-to-end invocation against the deployed service first, then verify pod health separately.
import requests
resp = requests.post(
"http://pension-rag-agent-svc/rag",
json={"question": "Can a member withdraw funds before retirement age?"}
)
print(resp.status_code)
print(resp.json())
Expected output:
200
{
"answer": "Withdrawals require trustee approval and must follow scheme rules.",
"context": [
"Pension scheme withdrawals are subject to trustee approval.",
"Investment policy statement requires quarterly risk review."
]
}
If that passes, confirm Kubernetes sees healthy pods:
kubectl get pods -l app=pension-rag-agent
kubectl logs deploy/pension-rag-agent --tail=50
Real-World Use Cases
- •Member support assistant that answers retirement eligibility, withdrawal rules, and contribution questions from approved pension documents.
- •Compliance copilot that checks whether an internal response aligns with trustee-approved policy language before it goes out.
- •Investment operations agent that retrieves IPS clauses, summarizes quarterly obligations, and routes exceptions into human review workflows.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit