How to Integrate FastAPI for payments with PostgreSQL for RAG

By Cyprian AaronsUpdated 2026-04-21
fastapi-for-paymentspostgresqlrag

If you’re building an AI agent that takes payments and answers questions from your own data, you need two things to work reliably: a payment API for charging users and PostgreSQL as the retrieval layer for RAG. FastAPI gives you a clean way to expose payment endpoints, while PostgreSQL stores vectors, metadata, and transaction state in one place.

This setup is common in subscription agents, paid document assistants, and internal copilots where access depends on billing status. The pattern is simple: process payment events through FastAPI, persist user and retrieval state in PostgreSQL, then use that state to control what the agent can retrieve and answer.

Prerequisites

  • Python 3.10+
  • FastAPI installed
  • uvicorn installed
  • PostgreSQL 14+
  • psycopg or psycopg2-binary
  • pgvector extension enabled in PostgreSQL
  • A payment provider account with API keys
  • Basic knowledge of REST APIs and SQL
  • A .env file for secrets like:
    • DATABASE_URL
    • PAYMENT_API_KEY
    • PAYMENT_WEBHOOK_SECRET

Integration Steps

  1. Set up PostgreSQL for RAG and billing state

You want one database that can store both payment records and embeddings. For RAG, the simplest production-friendly route is PostgreSQL plus pgvector.

import os
import psycopg

DATABASE_URL = os.getenv("DATABASE_URL")

schema_sql = """
CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS customers (
    id SERIAL PRIMARY KEY,
    email TEXT UNIQUE NOT NULL,
    payment_customer_id TEXT UNIQUE,
    active_subscription BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS documents (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    content TEXT NOT NULL,
    embedding VECTOR(1536),
    source TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS payments (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    provider_payment_id TEXT UNIQUE NOT NULL,
    amount_cents INTEGER NOT NULL,
    currency TEXT NOT NULL DEFAULT 'usd',
    status TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
"""

with psycopg.connect(DATABASE_URL) as conn:
    with conn.cursor() as cur:
        cur.execute(schema_sql)
    conn.commit()
  1. Create a FastAPI payment endpoint

FastAPI will handle your checkout flow or payment intent creation. In this example, the app calls a provider SDK method to create a payment session.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os

app = FastAPI()

class CheckoutRequest(BaseModel):
    email: str
    amount_cents: int

# Example using Stripe-style SDK calls.
# Replace with your actual provider SDK if different.
import stripe
stripe.api_key = os.getenv("PAYMENT_API_KEY")

@app.post("/payments/checkout")
def create_checkout_session(payload: CheckoutRequest):
    try:
        session = stripe.checkout.Session.create(
            mode="payment",
            customer_email=payload.email,
            line_items=[{
                "price_data": {
                    "currency": "usd",
                    "product_data": {"name": "RAG Access"},
                    "unit_amount": payload.amount_cents,
                },
                "quantity": 1,
            }],
            success_url="https://yourapp.com/success",
            cancel_url="https://yourapp.com/cancel",
        )
        return {"checkout_url": session.url, "session_id": session.id}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
  1. Persist payment events into PostgreSQL

Your webhook should be the source of truth for subscription status. When the provider confirms payment, update the customer record and write the transaction row.

from fastapi import Request
import psycopg

@app.post("/payments/webhook")
async def payment_webhook(request: Request):
    body = await request.body()
    signature = request.headers.get("Stripe-Signature")

    try:
        event = stripe.Webhook.construct_event(
            payload=body,
            sig_header=signature,
            secret=os.getenv("PAYMENT_WEBHOOK_SECRET"),
        )
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Webhook error: {e}")

    if event["type"] == "checkout.session.completed":
        session = event["data"]["object"]
        email = session["customer_email"]
        provider_payment_id = session["id"]

        with psycopg.connect(DATABASE_URL) as conn:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO customers (email, payment_customer_id, active_subscription)
                    VALUES (%s, %s, TRUE)
                    ON CONFLICT (email)
                    DO UPDATE SET active_subscription = TRUE;
                    """,
                    (email, session.get("customer"),),
                )

                cur.execute(
                    """
                    INSERT INTO payments (customer_id, provider_payment_id, amount_cents, currency, status)
                    SELECT id, %s, %s, %s, %s
                    FROM customers WHERE email = %s;
                    """,
                    (
                        provider_payment_id,
                        session["amount_total"],
                        session["currency"],
                        "paid",
                        email,
                    ),
                )
            conn.commit()

    return {"received": True}
  1. Store RAG embeddings only for paying users

This is where payments and retrieval meet. Before inserting documents into your vector table, check subscription status from PostgreSQL.

from typing import List

def is_active_customer(email: str) -> bool:
    with psycopg.connect(DATABASE_URL) as conn:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT active_subscription FROM customers WHERE email = %s;",
                (email,),
            )
            row = cur.fetchone()
            return bool(row and row[0])

def save_document(email: str, content: str, embedding: List[float], source: str):
    if not is_active_customer(email):
        raise ValueError("Customer does not have an active subscription")

    with psycopg.connect(DATABASE_URL) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO documents (customer_id, content, embedding, source)
                SELECT id, %s::text, %s::vector, %s
                FROM customers WHERE email = %s;
                """,
                (content, embedding, source, email),
            )
        conn.commit()
  1. Query PostgreSQL for RAG at answer time

When the agent receives a question, fetch the most relevant chunks from PostgreSQL using vector similarity. Then gate access by subscription before returning results.

def retrieve_context(email: str, query_embedding: List[float], limit: int = 3):
    with psycopg.connect(DATABASE_URL) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT d.content
                FROM documents d
                JOIN customers c ON c.id = d.customer_id
                WHERE c.email = %s AND c.active_subscription = TRUE
                ORDER BY d.embedding <-> %s::vector
                LIMIT %s;
                """,
                (email, query_embedding, limit),
            )
            return [row[0] for row in cur.fetchall()]

Testing the Integration

Use a quick smoke test to confirm both sides are wired correctly: create a customer record after payment and retrieve protected RAG content.

test_email = "alice@example.com"

print("Subscription active:", is_active_customer(test_email))

try:
    save_document(
        email=test_email,
        content="Policy renewal requires identity verification.",
        embedding=[0.01] * 1536,
        source="policy.md",
    )
    print("Document saved")
except Exception as e:
    print("Save failed:", e)

context = retrieve_context(test_email, [0.01] * 1536)
print("Retrieved context:", context)

Expected output:

Subscription active: True
Document saved
Retrieved context: ['Policy renewal requires identity verification.']

Real-World Use Cases

  • Paid document assistant

    • Users pay through FastAPI endpoints before they can upload or query private knowledge bases stored in PostgreSQL.
  • Subscription-based support agent

    • The agent checks active_subscription in PostgreSQL before answering product or compliance questions from indexed internal docs.
  • Usage-based enterprise copilot

    • Payments are recorded per invoice cycle while PostgreSQL tracks embeddings per tenant for isolated RAG retrieval.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides