How to Integrate FastAPI for investment banking with PostgreSQL for multi-agent systems

By Cyprian AaronsUpdated 2026-04-21
fastapi-for-investment-bankingpostgresqlmulti-agent-systems

FastAPI gives you a clean API layer for investment banking workflows, and PostgreSQL gives your agent system durable state, auditability, and queryable memory. Put them together and you can build multi-agent systems that ingest market data, coordinate trade or research tasks, and persist every decision for compliance and replay.

Prerequisites

  • Python 3.11+
  • A running PostgreSQL instance
  • fastapi
  • uvicorn
  • psycopg[binary] or psycopg2-binary
  • sqlalchemy
  • pydantic
  • Access to your investment banking FastAPI service endpoints
  • A .env file with database credentials
  • Basic understanding of async Python and REST APIs

Integration Steps

  1. Set up the PostgreSQL connection layer

Use SQLAlchemy for connection management and keep the engine reusable across agents. For investment banking systems, this should be configured with SSL if you’re connecting to managed Postgres.

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+psycopg://bank_user:bank_pass@localhost:5432/agentdb"

engine = create_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
)

SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def ping_db():
    with engine.connect() as conn:
        return conn.execute(text("SELECT 1")).scalar_one()
  1. Define a schema for agent state and banking tasks

Your agents need a place to store task status, instrument metadata, risk flags, and execution traces. Keep the schema simple at first; you can normalize later.

from sqlalchemy import (
    Column,
    Integer,
    String,
    JSON,
    DateTime,
    Text,
    func,
)
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class AgentTask(Base):
    __tablename__ = "agent_tasks"

    id = Column(Integer, primary_key=True)
    agent_name = Column(String(100), nullable=False)
    task_type = Column(String(100), nullable=False)
    payload = Column(JSON, nullable=False)
    status = Column(String(30), nullable=False, default="queued")
    result = Column(JSON, nullable=True)
    error = Column(Text, nullable=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

Create the table once during deployment:

from db import engine
from models import Base

Base.metadata.create_all(bind=engine)
  1. Build the FastAPI integration layer for agent orchestration

Expose endpoints that let one agent enqueue work and another agent consume results. In investment banking workflows, this is where you route tasks like pricing checks, deal memo extraction, or exposure review.

from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session

from db import get_db
from models import AgentTask

app = FastAPI(title="IB Multi-Agent Orchestrator")

class TaskIn(BaseModel):
    agent_name: str
    task_type: str
    payload: dict

@app.post("/tasks")
def create_task(task: TaskIn, db: Session = Depends(get_db)):
    row = AgentTask(
        agent_name=task.agent_name,
        task_type=task.task_type,
        payload=task.payload,
        status="queued",
    )
    db.add(row)
    db.commit()
    db.refresh(row)
    return {"id": row.id, "status": row.status}

@app.get("/tasks/{task_id}")
def get_task(task_id: int, db: Session = Depends(get_db)):
    row = db.query(AgentTask).filter(AgentTask.id == task_id).first()
    if not row:
        raise HTTPException(status_code=404, detail="Task not found")
    return {
        "id": row.id,
        "agent_name": row.agent_name,
        "task_type": row.task_type,
        "status": row.status,
        "payload": row.payload,
        "result": row.result,
        "error": row.error,
    }
  1. Connect an agent worker to PostgreSQL and the API

This worker polls PostgreSQL for queued tasks, processes them, then writes back results. In a real system the processing step may call internal pricing models or external market data services.

import requests
from sqlalchemy.orm import Session

from db import SessionLocal
from models import AgentTask

API_BASE_URL = "http://localhost:8000"

def process_task(payload: dict) -> dict:
    # Replace with valuation / compliance / research logic.
    symbol = payload["symbol"]
    quantity = payload["quantity"]
    return {
        "symbol": symbol,
        "quantity": quantity,
        "status": "processed",
        "notional_estimate": quantity * 100.25,
    }

def run_worker():
    db: Session = SessionLocal()
    try:
        task = (
            db.query(AgentTask)
            .filter(AgentTask.status == "queued")
            .order_by(AgentTask.created_at.asc())
            .first()
        )

        if not task:
            return None

        task.status = "running"
        db.commit()

        result = process_task(task.payload)

        task.result = result
        task.status = "done"
        db.commit()

        requests.post(
            f"{API_BASE_URL}/tasks/{task.id}/events",
            json={"event": "completed", "result": result},
            timeout=5,
        )

        return result
    finally:
     	db.close()
  1. Add an event endpoint for audit trails

Investment banking systems need traceability. Store state transitions so you can reconstruct who did what and when.

from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sqlalchemy.orm import Session

from db import get_db
from models import AgentTask

app = FastAPI()

class TaskEvent(BaseModel):
   event: str
   result: dict | None = None

@app.post("/tasks/{task_id}/events")
def append_event(task_id: int, event: TaskEvent, db: Session = Depends(get_db)):
   task = db.query(AgentTask).filter(AgentTask.id == task_id).first()
   if not task:
       return {"ok": False}

   if event.event == "completed":
       task.status = "done"
       task.result = event.result

   db.commit()
   return {"ok": True}

Testing the Integration

Run the API:

uvicorn main:app --reload --port 8000

Create a task:

import requests

resp = requests.post(
   "http://localhost:8000/tasks",
   json={
      "agent_name": "trade-review-agent",
      "task_type": "position_check",
      "payload": {"symbol": "AAPL", "quantity": 250},
   },
   timeout=5,
)

print(resp.json())

Expected output:

{"id": 1,"status":"queued"}

Then run the worker once and fetch the updated record:

import requests

requests.get("http://localhost:8000/tasks/1", timeout=5).json()

Expected output:

{
  "id": 1,
  "agent_name": "trade-review-agent",
  "task_type": "position_check",
  "status": "done",
  "payload": {"symbol":"AAPL","quantity":250},
  "result":{"symbol":"AAPL","quantity":250,"status":"processed","notional_estimate":25062.5},
  "error": null
}

Real-World Use Cases

  • Trade pre-check agents that validate order size, concentration limits, and restricted lists before routing to execution.
  • Research coordination systems where one agent summarizes filings while another stores extracted entities and confidence scores in Postgres.
  • Compliance audit pipelines that persist every prompt input, model output, approval step, and exception for later review.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides