How to Integrate LangGraph for pension funds with Kubernetes for multi-agent systems
Integrating LangGraph with Kubernetes gives you a clean way to run pension-fund workflows as durable multi-agent systems. LangGraph handles the stateful orchestration between agents, while Kubernetes gives you isolation, scaling, and deployment control for regulated workloads like contribution checks, retirement eligibility reviews, and document verification.
Prerequisites
- •Python 3.10+
- •A running Kubernetes cluster
- •local:
kind,minikube, ork3d - •remote: EKS, GKE, AKS
- •local:
- •
kubectlconfigured against your cluster - •Access to a container registry for agent images
- •LangGraph installed:
- •
pip install langgraph langchain
- •
- •Kubernetes Python client installed:
- •
pip install kubernetes
- •
- •A service account or kubeconfig with permissions to:
- •create
Jobs - •read
Pods - •watch job status
- •create
- •Environment variables set for your agent runtime:
- •
KUBECONFIG - •model provider keys if your agents call an LLM
- •
Integration Steps
- •Define your LangGraph state and agent nodes
For pension-fund workflows, keep the graph explicit. Each node should represent a business action like eligibility validation, contribution reconciliation, or escalation to a human reviewer.
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, START, END
class PensionState(TypedDict):
member_id: str
documents: List[str]
eligibility_status: Optional[str]
reconciliation_status: Optional[str]
approval_status: Optional[str]
def eligibility_agent(state: PensionState) -> PensionState:
# Replace with real policy logic or model call
state["eligibility_status"] = "eligible" if state["member_id"].startswith("P") else "review"
return state
def reconciliation_agent(state: PensionState) -> PensionState:
state["reconciliation_status"] = "matched"
return state
def approval_agent(state: PensionState) -> PensionState:
state["approval_status"] = "approved" if state["eligibility_status"] == "eligible" else "manual_review"
return state
graph = StateGraph(PensionState)
graph.add_node("eligibility", eligibility_agent)
graph.add_node("reconciliation", reconciliation_agent)
graph.add_node("approval", approval_agent)
graph.add_edge(START, "eligibility")
graph.add_edge("eligibility", "reconciliation")
graph.add_edge("reconciliation", "approval")
graph.add_edge("approval", END)
app = graph.compile()
- •Wrap each agent run in a Kubernetes Job
Use Kubernetes Jobs when you want each agent step to run in an isolated pod. That’s the right pattern for pension systems where you want auditability and failure isolation.
from kubernetes import client, config
import json
import uuid
config.load_kube_config()
batch_v1 = client.BatchV1Api()
namespace = "pension-agents"
def create_agent_job(agent_name: str, payload: dict) -> str:
job_name = f"{agent_name}-{uuid.uuid4().hex[:8]}"
container = client.V1Container(
name=agent_name,
image="ghcr.io/your-org/pension-agent:latest",
command=["python", "-m", "agent_runner"],
args=[json.dumps(payload)],
env=[
client.V1EnvVar(name="AGENT_NAME", value=agent_name),
],
)
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": agent_name}),
spec=client.V1PodSpec(
restart_policy="Never",
containers=[container],
),
)
job_spec = client.V1JobSpec(template=template, backoff_limit=2)
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=job_name),
spec=job_spec,
)
batch_v1.create_namespaced_job(namespace=namespace, body=job)
return job_name
- •Call Kubernetes from inside LangGraph nodes
This is the integration point. Each node submits work to Kubernetes and returns the status back into graph state.
from kubernetes import client, config
config.load_kube_config()
core_v1 = client.CoreV1Api()
batch_v1 = client.BatchV1Api()
def wait_for_job(job_name: str, namespace: str = "pension-agents") -> str:
while True:
job = batch_v1.read_namespaced_job(name=job_name, namespace=namespace)
if job.status.succeeded:
return "succeeded"
if job.status.failed and job.status.failed > 0:
return "failed"
def kubernetes_eligibility_node(state: PensionState) -> PensionState:
job_name = create_agent_job("eligibility", {"member_id": state["member_id"]})
status = wait_for_job(job_name)
state["eligibility_status"] = status
return state
def kubernetes_reconciliation_node(state: PensionState) -> PensionState:
job_name = create_agent_job("reconciliation", {"documents": state["documents"]})
status = wait_for_job(job_name)
state["reconciliation_status"] = status
return state
def kubernetes_approval_node(state: PensionState) -> PensionState:
job_name = create_agent_job("approval", {"state": dict(state)})
status = wait_for_job(job_name)
state["approval_status"] = status
return state
- •Replace local nodes with Kubernetes-backed nodes in the graph
Keep the orchestration in LangGraph and push execution into Kubernetes. This gives you one control plane for workflow logic and another for runtime scheduling.
from langgraph.graph import StateGraph, START, END
k8s_graph = StateGraph(PensionState)
k8s_graph.add_node("eligibility", kubernetes_eligibility_node)
k8s_graph.add_node("reconciliation", kubernetes_reconciliation_node)
k8s_graph.add_node("approval", kubernetes_approval_node)
k8s_graph.add_edge(START, "eligibility")
k8s_graph.add_edge("eligibility", "reconciliation")
k8s_graph.add_edge("reconciliation", "approval")
k8s_graph.add_edge("approval", END)
k8s_app = k8s_graph.compile()
result = k8s_app.invoke({
"member_id": "P12345",
"documents": ["id.pdf", "contribution_statement.pdf"],
"eligibility_status": None,
"reconciliation_status": None,
"approval_status": None,
})
print(result)
- •Add pod log inspection for audit trails
For pension operations, you need traceability. Pull pod logs after each Job completes so you can store them alongside workflow metadata.
def get_job_pod_logs(job_label: str, namespace: str = "pension-agents") -> str:
pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=f"app={job_label}")
if not pods.items:
return ""
pod_name = pods.items[0].metadata.name
return core_v1.read_namespaced_pod_log(name=pod_name, namespace=namespace)
logs = get_job_pod_logs("eligibility")
print(logs)
Testing the Integration
Run a single end-to-end invocation against a test namespace. You want to confirm that LangGraph advances through all nodes and that Kubernetes actually schedules the jobs.
test_input = {
"member_id": "P77881",
"documents": ["kyc.pdf", "beneficiary_form.pdf"],
"eligibility_status": None,
"reconciliation_status": None,
}
output = k8s_app.invoke(test_input)
print(output["eligibility_status"])
print(output["reconciliation_status"])
print(output["approval_status"])
Expected output:
succeeded
succeeded
approved
If you get failed, inspect the Job events:
events = core_v1.list_namespaced_event(namespace="pension-agents")
for event in events.items[:5]:
print(event.message)
Real-World Use Cases
- •
Retirement benefit processing
- •One agent validates member eligibility.
- •Another reconciles contribution history.
- •A third generates an approval packet for ops review.
- •
Claims and beneficiary verification
- •Run document extraction in one pod.
- •Run policy checks in another.
- •Escalate exceptions to a human-review node in LangGraph.
- •
Regulatory reporting pipelines
- •Split reporting into separate agents for data quality checks, anomaly detection, and filing preparation.
- •Use Kubernetes Jobs for isolated execution and repeatable runs per reporting cycle.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit