AutoGen Tutorial (Python): building a RAG pipeline for advanced developers
This tutorial shows you how to build a retrieval-augmented generation pipeline with AutoGen in Python, using a local document index and an assistant agent that answers grounded questions from retrieved context. You need this when plain prompting is not enough and you want the model to answer from your own data instead of hallucinating from memory.
What You'll Need
- •Python 3.10+
- •
autogen-agentchat - •
autogen-ext - •
chromadb - •
sentence-transformers - •An OpenAI API key set as
OPENAI_API_KEY - •A small document set to index, such as policy docs, product specs, or support notes
Install the packages:
pip install autogen-agentchat autogen-ext chromadb sentence-transformers
Step-by-Step
- •Start by creating a small local knowledge base. For production systems, you would usually chunk documents before indexing, but for this tutorial we’ll use short text snippets so the pipeline stays easy to run and inspect.
from pathlib import Path
docs = {
"claims_policy.txt": """
Claims must be submitted within 30 days of incident date.
The adjuster may request supporting receipts and photos.
High-value claims require manual review by a senior analyst.
""",
"underwriting_notes.txt": """
Policies with prior fraud flags require enhanced verification.
Premium adjustments must be approved by underwriting.
Coverage exclusions are listed in the policy schedule.
""",
}
data_dir = Path("rag_docs")
data_dir.mkdir(exist_ok=True)
for name, content in docs.items():
(data_dir / name).write_text(content.strip(), encoding="utf-8")
print(f"Wrote {len(docs)} documents to {data_dir}")
- •Next, build a vector store and index the documents with embeddings. This example uses Chroma locally so you can run it without standing up extra infrastructure.
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection(
name="policy_docs",
embedding_function=SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
),
)
documents = []
metadatas = []
ids = []
for i, path in enumerate(sorted(data_dir.glob("*.txt"))):
text = path.read_text(encoding="utf-8")
documents.append(text)
metadatas.append({"source": path.name})
ids.append(f"doc-{i}")
collection.upsert(documents=documents, metadatas=metadatas, ids=ids)
print("Indexed documents:", collection.count())
- •Now create a retrieval function that pulls the top matching chunks for each question. The key pattern here is to keep retrieval outside the LLM call so you can inspect, test, and swap components independently.
def retrieve_context(query: str, k: int = 2) -> str:
results = collection.query(
query_texts=[query],
n_results=k,
include=["documents", "metadatas", "distances"],
)
blocks = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
):
blocks.append(f"Source: {meta['source']} | Distance: {dist:.4f}\n{doc}")
return "\n\n---\n\n".join(blocks)
print(retrieve_context("What happens if a claim is late?"))
- •Wire retrieval into AutoGen by giving the assistant a strict system prompt and feeding it retrieved context per question. This keeps the model grounded and makes the behavior deterministic enough for enterprise use cases.
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent(
name="rag_assistant",
model_client=model_client,
system_message=(
"Answer only using the provided context. "
"If the answer is not in the context, say you don't know."
),
)
async def answer_question(question: str) -> str:
context = retrieve_context(question)
prompt = f"""Context:
{context}
Question:
{question}
"""
result = await assistant.run(task=prompt)
return result.messages[-1].content
print(asyncio.run(answer_question("What is required for high-value claims?")))
- •Add a thin wrapper so your RAG pipeline behaves like a real service entrypoint. In practice this is where you would add logging, tracing, request IDs, and fallback logic.
def ask(question: str) -> None:
response = asyncio.run(answer_question(question))
print("\nQUESTION:", question)
print("ANSWER:", response)
if __name__ == "__main__":
ask("What happens if a claim is submitted after 30 days?")
Testing It
Run the script and ask questions that are explicitly covered by your documents, such as claim deadlines or underwriting approval rules. You should see answers that quote or closely reflect the indexed text rather than generic model output.
Then ask something outside the corpus, like “What is your refund policy?” The assistant should refuse to invent an answer and say it does not know based on the provided context.
If you want stronger validation, log the retrieved sources alongside each answer and verify that the top results actually contain the facts used in the response. That matters more than model fluency when you’re building RAG for regulated workflows.
Next Steps
- •Replace whole-file indexing with proper chunking plus overlap so retrieval quality improves on longer documents.
- •Add reranking before generation if your corpus is large or semantically dense.
- •Wrap retrieval and answering in an AutoGen multi-agent workflow if you need query rewriting, verification, or citation checking.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit