How to Integrate FastAPI for fintech with PostgreSQL for RAG
FastAPI for fintech gives you the API layer to expose workflows, auth, and business logic. PostgreSQL gives you durable storage for customer data, embeddings, and retrieval metadata. Put them together and you can build an AI agent that answers regulated finance questions with grounded context pulled from your own transaction, policy, or product corpus.
Prerequisites
- •Python 3.10+
- •A running PostgreSQL 14+ instance
- •A FastAPI app scaffolded and ready to run
- •
psycopgorasyncpginstalled for PostgreSQL access - •
sqlalchemy[asyncio]if you want ORM-style integration - •
uvicornfor local API execution - •Environment variables configured:
- •
DATABASE_URL - •
OPENAI_API_KEYor your embedding provider key - •any fintech API credentials required by your FastAPI service
- •
Install the core packages:
pip install fastapi uvicorn psycopg[binary] sqlalchemy[asyncio] pydantic
Integration Steps
1) Define your PostgreSQL schema for RAG storage
For RAG, don’t just store raw text. Store chunked content, embeddings, and source metadata so retrieval stays auditable.
from sqlalchemy import String, Text, Integer, JSON, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://rag_user:rag_pass@localhost:5432/rag_db"
engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class RagChunk(Base):
__tablename__ = "rag_chunks"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
doc_id: Mapped[str] = mapped_column(String(64), index=True)
chunk_text: Mapped[str] = mapped_column(Text)
source_type: Mapped[str] = mapped_column(String(32)) # policy, txn_note, faq
metadata: Mapped[dict] = mapped_column(JSON)
If you’re using pgvector, add a vector column for similarity search. In production fintech systems, that’s the cleanest route for semantic retrieval inside Postgres.
2) Build the FastAPI app and wire in the database session
FastAPI handles request validation and endpoint orchestration. Your agent can call these endpoints to ingest documents or retrieve context before answering.
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI(title="Fintech RAG API")
async def get_db() -> AsyncSession:
async with SessionLocal() as session:
yield session
@app.get("/health")
async def health():
return {"status": "ok"}
At this point you have the API shell connected to PostgreSQL through an async session dependency. That pattern keeps connection handling predictable under load.
3) Insert fintech knowledge into PostgreSQL from a FastAPI endpoint
This is where your finance domain data enters the RAG pipeline. In a real system this could be product disclosures, underwriting rules, claims notes, or transaction dispute guidance.
from pydantic import BaseModel
class ChunkIn(BaseModel):
doc_id: str
chunk_text: str
source_type: str
metadata: dict
@app.post("/ingest")
async def ingest_chunk(payload: ChunkIn, db: AsyncSession = Depends(get_db)):
chunk = RagChunk(
doc_id=payload.doc_id,
chunk_text=payload.chunk_text,
source_type=payload.source_type,
metadata=payload.metadata,
)
db.add(chunk)
await db.commit()
await db.refresh(chunk)
return {"id": chunk.id, "doc_id": chunk.doc_id}
This endpoint uses standard FastAPI request parsing and SQLAlchemy async persistence. For fintech workloads, keep metadata explicit: jurisdiction, product line, version, approval date.
4) Add retrieval logic for RAG queries
For a basic setup you can start with keyword filtering in PostgreSQL. If you have embeddings available, replace this with vector similarity using pgvector.
from sqlalchemy import select
class QueryIn(BaseModel):
query: str
source_type: str | None = None
@app.post("/retrieve")
async def retrieve_context(payload: QueryIn, db: AsyncSession = Depends(get_db)):
stmt = select(RagChunk).limit(5)
if payload.source_type:
stmt = stmt.where(RagChunk.source_type == payload.source_type)
result = await db.execute(stmt)
rows = result.scalars().all()
return {
"query": payload.query,
"contexts": [
{
"doc_id": row.doc_id,
"chunk_text": row.chunk_text,
"metadata": row.metadata,
}
for row in rows
],
}
That gives your agent a retrieval endpoint it can call before generating an answer. In production you’d rank by semantic similarity and add filters for tenant ID and permission scope.
5) Connect retrieval to an AI agent workflow
The agent should call PostgreSQL-backed retrieval first, then feed the returned chunks into the model prompt. Keep generation separate from storage so auditing stays simple.
import httpx
async def answer_fintech_question(question: str):
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
retrieved = await client.post("/retrieve", json={"query": question})
contexts = retrieved.json()["contexts"]
prompt_context = "\n\n".join(
f"[{c['doc_id']}] {c['chunk_text']}" for c in contexts
)
# Replace this with your model provider call.
final_prompt = f"""
Answer using only the context below.
Context:
{prompt_context}
Question:
{question}
"""
return final_prompt
This is the pattern you want in regulated environments:
- •retrieve from Postgres
- •constrain generation to approved sources
- •log inputs and outputs for review
Testing the Integration
Run PostgreSQL locally or point to a staging database first. Then start FastAPI:
uvicorn main:app --reload --port 8000
Test ingestion and retrieval:
import asyncio
import httpx
async def test_flow():
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
ingest_resp = await client.post("/ingest", json={
"doc_id": "pol-001",
"chunk_text": "Card disputes must be filed within 60 days of statement date.",
"source_type": "policy",
"metadata": {"jurisdiction": "US", "version": "1.0"}
})
print("INGEST:", ingest_resp.json())
retrieve_resp = await client.post("/retrieve", json={
"query": "How long do customers have to file card disputes?",
"source_type": "policy"
})
print("RETRIEVE:", retrieve_resp.json())
asyncio.run(test_flow())
Expected output:
INGEST: {'id': 1, 'doc_id': 'pol-001'}
RETRIEVE: {'query': 'How long do customers have to file card disputes?', 'contexts': [...]}
If ingestion works but retrieval returns empty results:
- •check your database URL
- •confirm the table exists
- •verify the endpoint is filtering on the right
source_type - •inspect whether transactions are being committed
Real-World Use Cases
- •
Customer support copilot
Retrieve policy snippets from PostgreSQL and answer questions about fees, dispute windows, KYC requirements, or claims status through FastAPI endpoints. - •
Internal compliance assistant
Store approved regulatory guidance in Postgres and let agents retrieve only versioned content when drafting responses or reviewing case notes. - •
Underwriting or claims triage agent
Use FastAPI as the orchestration layer and PostgreSQL as the retrieval store for prior decisions, rulesets, and case history tied to each customer or claim.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit