How to Integrate FastAPI for payments with PostgreSQL for RAG
If you’re building an AI agent that takes payments and answers questions from your own data, you need two things to work reliably: a payment API for charging users and PostgreSQL as the retrieval layer for RAG. FastAPI gives you a clean way to expose payment endpoints, while PostgreSQL stores vectors, metadata, and transaction state in one place.
This setup is common in subscription agents, paid document assistants, and internal copilots where access depends on billing status. The pattern is simple: process payment events through FastAPI, persist user and retrieval state in PostgreSQL, then use that state to control what the agent can retrieve and answer.
Prerequisites
- •Python 3.10+
- •FastAPI installed
- •
uvicorninstalled - •PostgreSQL 14+
- •
psycopgorpsycopg2-binary - •
pgvectorextension enabled in PostgreSQL - •A payment provider account with API keys
- •Basic knowledge of REST APIs and SQL
- •A
.envfile for secrets like:- •
DATABASE_URL - •
PAYMENT_API_KEY - •
PAYMENT_WEBHOOK_SECRET
- •
Integration Steps
- •Set up PostgreSQL for RAG and billing state
You want one database that can store both payment records and embeddings. For RAG, the simplest production-friendly route is PostgreSQL plus pgvector.
import os
import psycopg
DATABASE_URL = os.getenv("DATABASE_URL")
schema_sql = """
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS customers (
id SERIAL PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
payment_customer_id TEXT UNIQUE,
active_subscription BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
content TEXT NOT NULL,
embedding VECTOR(1536),
source TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS payments (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
provider_payment_id TEXT UNIQUE NOT NULL,
amount_cents INTEGER NOT NULL,
currency TEXT NOT NULL DEFAULT 'usd',
status TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
"""
with psycopg.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(schema_sql)
conn.commit()
- •Create a FastAPI payment endpoint
FastAPI will handle your checkout flow or payment intent creation. In this example, the app calls a provider SDK method to create a payment session.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
app = FastAPI()
class CheckoutRequest(BaseModel):
email: str
amount_cents: int
# Example using Stripe-style SDK calls.
# Replace with your actual provider SDK if different.
import stripe
stripe.api_key = os.getenv("PAYMENT_API_KEY")
@app.post("/payments/checkout")
def create_checkout_session(payload: CheckoutRequest):
try:
session = stripe.checkout.Session.create(
mode="payment",
customer_email=payload.email,
line_items=[{
"price_data": {
"currency": "usd",
"product_data": {"name": "RAG Access"},
"unit_amount": payload.amount_cents,
},
"quantity": 1,
}],
success_url="https://yourapp.com/success",
cancel_url="https://yourapp.com/cancel",
)
return {"checkout_url": session.url, "session_id": session.id}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
- •Persist payment events into PostgreSQL
Your webhook should be the source of truth for subscription status. When the provider confirms payment, update the customer record and write the transaction row.
from fastapi import Request
import psycopg
@app.post("/payments/webhook")
async def payment_webhook(request: Request):
body = await request.body()
signature = request.headers.get("Stripe-Signature")
try:
event = stripe.Webhook.construct_event(
payload=body,
sig_header=signature,
secret=os.getenv("PAYMENT_WEBHOOK_SECRET"),
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Webhook error: {e}")
if event["type"] == "checkout.session.completed":
session = event["data"]["object"]
email = session["customer_email"]
provider_payment_id = session["id"]
with psycopg.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO customers (email, payment_customer_id, active_subscription)
VALUES (%s, %s, TRUE)
ON CONFLICT (email)
DO UPDATE SET active_subscription = TRUE;
""",
(email, session.get("customer"),),
)
cur.execute(
"""
INSERT INTO payments (customer_id, provider_payment_id, amount_cents, currency, status)
SELECT id, %s, %s, %s, %s
FROM customers WHERE email = %s;
""",
(
provider_payment_id,
session["amount_total"],
session["currency"],
"paid",
email,
),
)
conn.commit()
return {"received": True}
- •Store RAG embeddings only for paying users
This is where payments and retrieval meet. Before inserting documents into your vector table, check subscription status from PostgreSQL.
from typing import List
def is_active_customer(email: str) -> bool:
with psycopg.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT active_subscription FROM customers WHERE email = %s;",
(email,),
)
row = cur.fetchone()
return bool(row and row[0])
def save_document(email: str, content: str, embedding: List[float], source: str):
if not is_active_customer(email):
raise ValueError("Customer does not have an active subscription")
with psycopg.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO documents (customer_id, content, embedding, source)
SELECT id, %s::text, %s::vector, %s
FROM customers WHERE email = %s;
""",
(content, embedding, source, email),
)
conn.commit()
- •Query PostgreSQL for RAG at answer time
When the agent receives a question, fetch the most relevant chunks from PostgreSQL using vector similarity. Then gate access by subscription before returning results.
def retrieve_context(email: str, query_embedding: List[float], limit: int = 3):
with psycopg.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT d.content
FROM documents d
JOIN customers c ON c.id = d.customer_id
WHERE c.email = %s AND c.active_subscription = TRUE
ORDER BY d.embedding <-> %s::vector
LIMIT %s;
""",
(email, query_embedding, limit),
)
return [row[0] for row in cur.fetchall()]
Testing the Integration
Use a quick smoke test to confirm both sides are wired correctly: create a customer record after payment and retrieve protected RAG content.
test_email = "alice@example.com"
print("Subscription active:", is_active_customer(test_email))
try:
save_document(
email=test_email,
content="Policy renewal requires identity verification.",
embedding=[0.01] * 1536,
source="policy.md",
)
print("Document saved")
except Exception as e:
print("Save failed:", e)
context = retrieve_context(test_email, [0.01] * 1536)
print("Retrieved context:", context)
Expected output:
Subscription active: True
Document saved
Retrieved context: ['Policy renewal requires identity verification.']
Real-World Use Cases
- •
Paid document assistant
- •Users pay through FastAPI endpoints before they can upload or query private knowledge bases stored in PostgreSQL.
- •
Subscription-based support agent
- •The agent checks
active_subscriptionin PostgreSQL before answering product or compliance questions from indexed internal docs.
- •The agent checks
- •
Usage-based enterprise copilot
- •Payments are recorded per invoice cycle while PostgreSQL tracks embeddings per tenant for isolated RAG retrieval.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit