How to Integrate FastAPI for investment banking with PostgreSQL for multi-agent systems
FastAPI gives you a clean API layer for investment banking workflows, and PostgreSQL gives your agent system durable state, auditability, and queryable memory. Put them together and you can build multi-agent systems that ingest market data, coordinate trade or research tasks, and persist every decision for compliance and replay.
Prerequisites
- •Python 3.11+
- •A running PostgreSQL instance
- •
fastapi - •
uvicorn - •
psycopg[binary]orpsycopg2-binary - •
sqlalchemy - •
pydantic - •Access to your investment banking FastAPI service endpoints
- •A
.envfile with database credentials - •Basic understanding of async Python and REST APIs
Integration Steps
- •Set up the PostgreSQL connection layer
Use SQLAlchemy for connection management and keep the engine reusable across agents. For investment banking systems, this should be configured with SSL if you’re connecting to managed Postgres.
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+psycopg://bank_user:bank_pass@localhost:5432/agentdb"
engine = create_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def ping_db():
with engine.connect() as conn:
return conn.execute(text("SELECT 1")).scalar_one()
- •Define a schema for agent state and banking tasks
Your agents need a place to store task status, instrument metadata, risk flags, and execution traces. Keep the schema simple at first; you can normalize later.
from sqlalchemy import (
Column,
Integer,
String,
JSON,
DateTime,
Text,
func,
)
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class AgentTask(Base):
__tablename__ = "agent_tasks"
id = Column(Integer, primary_key=True)
agent_name = Column(String(100), nullable=False)
task_type = Column(String(100), nullable=False)
payload = Column(JSON, nullable=False)
status = Column(String(30), nullable=False, default="queued")
result = Column(JSON, nullable=True)
error = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
Create the table once during deployment:
from db import engine
from models import Base
Base.metadata.create_all(bind=engine)
- •Build the FastAPI integration layer for agent orchestration
Expose endpoints that let one agent enqueue work and another agent consume results. In investment banking workflows, this is where you route tasks like pricing checks, deal memo extraction, or exposure review.
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from db import get_db
from models import AgentTask
app = FastAPI(title="IB Multi-Agent Orchestrator")
class TaskIn(BaseModel):
agent_name: str
task_type: str
payload: dict
@app.post("/tasks")
def create_task(task: TaskIn, db: Session = Depends(get_db)):
row = AgentTask(
agent_name=task.agent_name,
task_type=task.task_type,
payload=task.payload,
status="queued",
)
db.add(row)
db.commit()
db.refresh(row)
return {"id": row.id, "status": row.status}
@app.get("/tasks/{task_id}")
def get_task(task_id: int, db: Session = Depends(get_db)):
row = db.query(AgentTask).filter(AgentTask.id == task_id).first()
if not row:
raise HTTPException(status_code=404, detail="Task not found")
return {
"id": row.id,
"agent_name": row.agent_name,
"task_type": row.task_type,
"status": row.status,
"payload": row.payload,
"result": row.result,
"error": row.error,
}
- •Connect an agent worker to PostgreSQL and the API
This worker polls PostgreSQL for queued tasks, processes them, then writes back results. In a real system the processing step may call internal pricing models or external market data services.
import requests
from sqlalchemy.orm import Session
from db import SessionLocal
from models import AgentTask
API_BASE_URL = "http://localhost:8000"
def process_task(payload: dict) -> dict:
# Replace with valuation / compliance / research logic.
symbol = payload["symbol"]
quantity = payload["quantity"]
return {
"symbol": symbol,
"quantity": quantity,
"status": "processed",
"notional_estimate": quantity * 100.25,
}
def run_worker():
db: Session = SessionLocal()
try:
task = (
db.query(AgentTask)
.filter(AgentTask.status == "queued")
.order_by(AgentTask.created_at.asc())
.first()
)
if not task:
return None
task.status = "running"
db.commit()
result = process_task(task.payload)
task.result = result
task.status = "done"
db.commit()
requests.post(
f"{API_BASE_URL}/tasks/{task.id}/events",
json={"event": "completed", "result": result},
timeout=5,
)
return result
finally:
db.close()
- •Add an event endpoint for audit trails
Investment banking systems need traceability. Store state transitions so you can reconstruct who did what and when.
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sqlalchemy.orm import Session
from db import get_db
from models import AgentTask
app = FastAPI()
class TaskEvent(BaseModel):
event: str
result: dict | None = None
@app.post("/tasks/{task_id}/events")
def append_event(task_id: int, event: TaskEvent, db: Session = Depends(get_db)):
task = db.query(AgentTask).filter(AgentTask.id == task_id).first()
if not task:
return {"ok": False}
if event.event == "completed":
task.status = "done"
task.result = event.result
db.commit()
return {"ok": True}
Testing the Integration
Run the API:
uvicorn main:app --reload --port 8000
Create a task:
import requests
resp = requests.post(
"http://localhost:8000/tasks",
json={
"agent_name": "trade-review-agent",
"task_type": "position_check",
"payload": {"symbol": "AAPL", "quantity": 250},
},
timeout=5,
)
print(resp.json())
Expected output:
{"id": 1,"status":"queued"}
Then run the worker once and fetch the updated record:
import requests
requests.get("http://localhost:8000/tasks/1", timeout=5).json()
Expected output:
{
"id": 1,
"agent_name": "trade-review-agent",
"task_type": "position_check",
"status": "done",
"payload": {"symbol":"AAPL","quantity":250},
"result":{"symbol":"AAPL","quantity":250,"status":"processed","notional_estimate":25062.5},
"error": null
}
Real-World Use Cases
- •Trade pre-check agents that validate order size, concentration limits, and restricted lists before routing to execution.
- •Research coordination systems where one agent summarizes filings while another stores extracted entities and confidence scores in Postgres.
- •Compliance audit pipelines that persist every prompt input, model output, approval step, and exception for later review.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit