How to Integrate LangChain for wealth management with PostgreSQL for AI agents

By Cyprian AaronsUpdated 2026-04-21
langchain-for-wealth-managementpostgresqlai-agents

Combining LangChain for wealth management with PostgreSQL gives you a practical agent backend: the model reasons over client context, portfolio data, and policy rules, while PostgreSQL stores durable state, audit trails, and retrieval-ready records. That matters when you need an AI agent that can answer client questions, summarize holdings, flag risk exposure, and persist every interaction for compliance.

Prerequisites

  • Python 3.10+
  • A PostgreSQL instance running locally or in your cloud environment
  • A database user with CREATE TABLE, SELECT, INSERT, and UPDATE permissions
  • LangChain installed with the relevant integrations for your wealth management workflow
  • psycopg2-binary or psycopg installed for PostgreSQL access
  • Environment variables configured:
    • OPENAI_API_KEY or your model provider key
    • POSTGRES_HOST
    • POSTGRES_PORT
    • POSTGRES_DB
    • POSTGRES_USER
    • POSTGRES_PASSWORD

Install the core packages:

pip install langchain langchain-openai psycopg2-binary sqlalchemy

Integration Steps

1) Set up your PostgreSQL connection

Use SQLAlchemy for clean connection handling and reuse it across your agent services. For production systems, keep credentials in environment variables and never hardcode them.

import os
from sqlalchemy import create_engine, text

POSTGRES_URI = (
    f"postgresql+psycopg2://{os.environ['POSTGRES_USER']}:"
    f"{os.environ['POSTGRES_PASSWORD']}@{os.environ['POSTGRES_HOST']}:"
    f"{os.environ.get('POSTGRES_PORT', '5432')}/{os.environ['POSTGRES_DB']}"
)

engine = create_engine(POSTGRES_URI, pool_pre_ping=True)

with engine.connect() as conn:
    result = conn.execute(text("SELECT version();"))
    print(result.fetchone()[0])

Create a table to store wealth management conversations and agent outputs:

from sqlalchemy import text

create_sql = """
CREATE TABLE IF NOT EXISTS wealth_agent_sessions (
    id SERIAL PRIMARY KEY,
    client_id TEXT NOT NULL,
    question TEXT NOT NULL,
    answer TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
"""

with engine.begin() as conn:
    conn.execute(text(create_sql))

2) Initialize the LangChain model

For wealth management workflows, keep the prompt constrained. The agent should summarize data, explain portfolio metrics, and avoid making unsupported investment recommendations.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a wealth management assistant. Be precise, conservative, and compliance-aware."),
    ("user", "Client question: {question}\nPortfolio context: {context}")
])

chain = prompt | llm

If you already have a LangChain-based wealth management pipeline, this is where you plug in your domain prompt and any retrieval layer tied to client holdings or policy docs.

3) Pull portfolio context from PostgreSQL

Store structured portfolio data in PostgreSQL so the agent can retrieve current facts before generating a response. This keeps answers grounded in your source of truth.

def get_client_portfolio(client_id: str):
    query = text("""
        SELECT asset_class, symbol, market_value, risk_bucket
        FROM client_portfolios
        WHERE client_id = :client_id
        ORDER BY market_value DESC;
    """)

    with engine.connect() as conn:
        rows = conn.execute(query, {"client_id": client_id}).mappings().all()

    return [
        {
            "asset_class": row["asset_class"],
            "symbol": row["symbol"],
            "market_value": float(row["market_value"]),
            "risk_bucket": row["risk_bucket"],
        }
        for row in rows
    ]

Convert the retrieved rows into compact context for the LLM:

def format_context(portfolio_rows):
    lines = []
    for row in portfolio_rows:
        lines.append(
            f"{row['symbol']} | {row['asset_class']} | "
            f"${row['market_value']:,.2f} | risk={row['risk_bucket']}"
        )
    return "\n".join(lines)

4) Generate an answer with LangChain and persist it back to PostgreSQL

This is the core loop: fetch state from Postgres, pass it into LangChain, then store the response for auditability.

def answer_client_question(client_id: str, question: str):
    portfolio_rows = get_client_portfolio(client_id)
    context = format_context(portfolio_rows)

    response = chain.invoke({
        "question": question,
        "context": context or "No portfolio records found."
    })

    answer_text = response.content if hasattr(response, "content") else str(response)

    insert_sql = text("""
        INSERT INTO wealth_agent_sessions (client_id, question, answer)
        VALUES (:client_id, :question, :answer)
    """)

    with engine.begin() as conn:
        conn.execute(insert_sql, {
            "client_id": client_id,
            "question": question,
            "answer": answer_text,
        })

    return answer_text

Example call:

result = answer_client_question(
    client_id="CUST-10021",
    question="What is my largest concentration risk right now?"
)
print(result)

5) Add a retrieval-friendly query pattern for agent memory

For multi-turn workflows, agents need recent history. Keep a session table in PostgreSQL and load the latest turns before each generation step.

def get_recent_session_history(client_id: str, limit: int = 5):
    query = text("""
        SELECT question, answer
        FROM wealth_agent_sessions
        WHERE client_id = :client_id
        ORDER BY created_at DESC
        LIMIT :limit;
    """)

    with engine.connect() as conn:
        rows = conn.execute(query, {"client_id": client_id, "limit": limit}).mappings().all()

    return list(reversed([
        {"question": row["question"], "answer": row["answer"]}
        for row in rows
    ]))

You can then prepend this history into your LangChain prompt before calling .invoke().

Testing the Integration

Run a simple end-to-end check against a known client ID. This verifies database connectivity, retrieval logic, model invocation, and persistence.

client_id = "CUST-10021"
question = "Summarize my top three holdings and call out any risk concentration."

answer = answer_client_question(client_id=client_id, question=question)
print("ANSWER:\n", answer)

with engine.connect() as conn:
    rows = conn.execute(text("""
        SELECT client_id, question, answer
        FROM wealth_agent_sessions
        WHERE client_id = :client_id
        ORDER BY created_at DESC
        LIMIT 1;
    """), {"client_id": client_id}).mappings().all()

print("\nSAVED RECORD:\n", rows[0])

Expected output:

ANSWER:
Your top holdings are concentrated in ...

SAVED RECORD:
{'client_id': 'CUST-10021', 'question': 'Summarize my top three holdings and call out any risk concentration.', 'answer': '...'}

Real-World Use Cases

  • Client-facing portfolio Q&A agents that explain holdings, performance drivers, and concentration risk using live data from PostgreSQL.
  • Advisor copilot systems that summarize recent interactions and generate compliant follow-up notes stored back into PostgreSQL.
  • Ops agents that detect stale positions or unusual allocations by querying PostgreSQL tables before drafting alerts or task lists.

If you want to take this further in production:

  • Add row-level security for advisor/client separation.
  • Store embeddings for policy docs or statements in Postgres using pgvector.
  • Wrap every LLM response with validation rules before writing it back to the database.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides