How to Integrate LangChain for wealth management with PostgreSQL for AI agents
Combining LangChain for wealth management with PostgreSQL gives you a practical agent backend: the model reasons over client context, portfolio data, and policy rules, while PostgreSQL stores durable state, audit trails, and retrieval-ready records. That matters when you need an AI agent that can answer client questions, summarize holdings, flag risk exposure, and persist every interaction for compliance.
Prerequisites
- •Python 3.10+
- •A PostgreSQL instance running locally or in your cloud environment
- •A database user with
CREATE TABLE,SELECT,INSERT, andUPDATEpermissions - •LangChain installed with the relevant integrations for your wealth management workflow
- •
psycopg2-binaryorpsycopginstalled for PostgreSQL access - •Environment variables configured:
- •
OPENAI_API_KEYor your model provider key - •
POSTGRES_HOST - •
POSTGRES_PORT - •
POSTGRES_DB - •
POSTGRES_USER - •
POSTGRES_PASSWORD
- •
Install the core packages:
pip install langchain langchain-openai psycopg2-binary sqlalchemy
Integration Steps
1) Set up your PostgreSQL connection
Use SQLAlchemy for clean connection handling and reuse it across your agent services. For production systems, keep credentials in environment variables and never hardcode them.
import os
from sqlalchemy import create_engine, text
POSTGRES_URI = (
f"postgresql+psycopg2://{os.environ['POSTGRES_USER']}:"
f"{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_HOST']}:"
f"{os.environ.get('POSTGRES_PORT', '5432')}/{os.environ['POSTGRES_DB']}"
)
engine = create_engine(POSTGRES_URI, pool_pre_ping=True)
with engine.connect() as conn:
result = conn.execute(text("SELECT version();"))
print(result.fetchone()[0])
Create a table to store wealth management conversations and agent outputs:
from sqlalchemy import text
create_sql = """
CREATE TABLE IF NOT EXISTS wealth_agent_sessions (
id SERIAL PRIMARY KEY,
client_id TEXT NOT NULL,
question TEXT NOT NULL,
answer TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
"""
with engine.begin() as conn:
conn.execute(text(create_sql))
2) Initialize the LangChain model
For wealth management workflows, keep the prompt constrained. The agent should summarize data, explain portfolio metrics, and avoid making unsupported investment recommendations.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a wealth management assistant. Be precise, conservative, and compliance-aware."),
("user", "Client question: {question}\nPortfolio context: {context}")
])
chain = prompt | llm
If you already have a LangChain-based wealth management pipeline, this is where you plug in your domain prompt and any retrieval layer tied to client holdings or policy docs.
3) Pull portfolio context from PostgreSQL
Store structured portfolio data in PostgreSQL so the agent can retrieve current facts before generating a response. This keeps answers grounded in your source of truth.
def get_client_portfolio(client_id: str):
query = text("""
SELECT asset_class, symbol, market_value, risk_bucket
FROM client_portfolios
WHERE client_id = :client_id
ORDER BY market_value DESC;
""")
with engine.connect() as conn:
rows = conn.execute(query, {"client_id": client_id}).mappings().all()
return [
{
"asset_class": row["asset_class"],
"symbol": row["symbol"],
"market_value": float(row["market_value"]),
"risk_bucket": row["risk_bucket"],
}
for row in rows
]
Convert the retrieved rows into compact context for the LLM:
def format_context(portfolio_rows):
lines = []
for row in portfolio_rows:
lines.append(
f"{row['symbol']} | {row['asset_class']} | "
f"${row['market_value']:,.2f} | risk={row['risk_bucket']}"
)
return "\n".join(lines)
4) Generate an answer with LangChain and persist it back to PostgreSQL
This is the core loop: fetch state from Postgres, pass it into LangChain, then store the response for auditability.
def answer_client_question(client_id: str, question: str):
portfolio_rows = get_client_portfolio(client_id)
context = format_context(portfolio_rows)
response = chain.invoke({
"question": question,
"context": context or "No portfolio records found."
})
answer_text = response.content if hasattr(response, "content") else str(response)
insert_sql = text("""
INSERT INTO wealth_agent_sessions (client_id, question, answer)
VALUES (:client_id, :question, :answer)
""")
with engine.begin() as conn:
conn.execute(insert_sql, {
"client_id": client_id,
"question": question,
"answer": answer_text,
})
return answer_text
Example call:
result = answer_client_question(
client_id="CUST-10021",
question="What is my largest concentration risk right now?"
)
print(result)
5) Add a retrieval-friendly query pattern for agent memory
For multi-turn workflows, agents need recent history. Keep a session table in PostgreSQL and load the latest turns before each generation step.
def get_recent_session_history(client_id: str, limit: int = 5):
query = text("""
SELECT question, answer
FROM wealth_agent_sessions
WHERE client_id = :client_id
ORDER BY created_at DESC
LIMIT :limit;
""")
with engine.connect() as conn:
rows = conn.execute(query, {"client_id": client_id, "limit": limit}).mappings().all()
return list(reversed([
{"question": row["question"], "answer": row["answer"]}
for row in rows
]))
You can then prepend this history into your LangChain prompt before calling .invoke().
Testing the Integration
Run a simple end-to-end check against a known client ID. This verifies database connectivity, retrieval logic, model invocation, and persistence.
client_id = "CUST-10021"
question = "Summarize my top three holdings and call out any risk concentration."
answer = answer_client_question(client_id=client_id, question=question)
print("ANSWER:\n", answer)
with engine.connect() as conn:
rows = conn.execute(text("""
SELECT client_id, question, answer
FROM wealth_agent_sessions
WHERE client_id = :client_id
ORDER BY created_at DESC
LIMIT 1;
"""), {"client_id": client_id}).mappings().all()
print("\nSAVED RECORD:\n", rows[0])
Expected output:
ANSWER:
Your top holdings are concentrated in ...
SAVED RECORD:
{'client_id': 'CUST-10021', 'question': 'Summarize my top three holdings and call out any risk concentration.', 'answer': '...'}
Real-World Use Cases
- •Client-facing portfolio Q&A agents that explain holdings, performance drivers, and concentration risk using live data from PostgreSQL.
- •Advisor copilot systems that summarize recent interactions and generate compliant follow-up notes stored back into PostgreSQL.
- •Ops agents that detect stale positions or unusual allocations by querying PostgreSQL tables before drafting alerts or task lists.
If you want to take this further in production:
- •Add row-level security for advisor/client separation.
- •Store embeddings for policy docs or statements in Postgres using
pgvector. - •Wrap every LLM response with validation rules before writing it back to the database.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit