CrewAI Tutorial (Python): building a RAG pipeline for advanced developers
This tutorial builds a production-shaped Retrieval-Augmented Generation pipeline with CrewAI in Python: ingest documents, retrieve relevant chunks, and answer questions with grounded context. You need this when a plain LLM starts hallucinating on private or domain-specific data and you want an agent workflow that can fetch the right evidence before generating an answer.
What You'll Need
- •Python 3.10+
- •
crewai - •
crewai-tools - •
langchain-community - •
langchain-openai - •
faiss-cpu - •
python-dotenv - •An OpenAI API key in
OPENAI_API_KEY - •A small local corpus in
.txtfiles, or any text you can load into documents
Install the packages:
pip install crewai crewai-tools langchain-community langchain-openai faiss-cpu python-dotenv
Step-by-Step
- •Start by loading your documents, splitting them into chunks, and building a FAISS vector store. This is the retrieval layer your agents will query later, and it should be deterministic and easy to rebuild.
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
load_dotenv()
docs = []
for filename in ["docs/policy.txt", "docs/claims.txt"]:
docs.extend(TextLoader(filename, encoding="utf-8").load())
chunks = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=120).split_documents(docs)
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(chunks, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
- •Wrap retrieval in a tool so CrewAI agents can call it cleanly. Keep the tool narrow: given a question, return only the top evidence snippets, not a full essay.
from crewai_tools import tool
@tool("retrieve_context")
def retrieve_context(question: str) -> str:
"""Return the most relevant document chunks for a question."""
docs = retriever.get_relevant_documents(question)
if not docs:
return "No relevant context found."
return "\n\n".join(
f"[Source {i+1}] {doc.page_content[:1200]}"
for i, doc in enumerate(docs)
)
print(retrieve_context("What is the claims escalation threshold?"))
- •Define two agents: one to retrieve evidence and one to answer using only that evidence. The separation matters because it gives you better control over grounding and makes debugging much easier than stuffing everything into one prompt.
from crewai import Agent
retrieval_agent = Agent(
role="Retriever",
goal="Find the most relevant context for the user's question.",
backstory="You are precise and only return evidence that helps answer the query.",
tools=[retrieve_context],
verbose=True,
)
answer_agent = Agent(
role="RAG Answerer",
goal="Answer questions using retrieved context only.",
backstory="You write concise answers grounded in source material.",
verbose=True,
)
- •Create tasks that enforce the workflow: first gather context, then generate the final answer from that context. In a real system you would also add citations, refusal rules, and output validation here.
from crewai import Task
retrieve_task = Task(
description="Retrieve the best context for this question: {question}",
expected_output="A compact set of relevant excerpts from the knowledge base.",
agent=retrieval_agent,
)
answer_task = Task(
description=(
"Using only the retrieved context below, answer the question clearly.\n"
"Question: {question}\n"
"Context: {context}"
),
expected_output="A grounded final answer with no unsupported claims.",
agent=answer_agent,
)
- •Run the pipeline by first invoking retrieval, then passing that output into the answering task. This keeps the control flow explicit instead of hiding it inside an opaque agent loop.
from crewai import Crew, Process
def rag_answer(question: str) -> str:
retrieval_crew = Crew(
agents=[retrieval_agent],
tasks=[retrieve_task],
process=Process.sequential,
verbose=True,
)
context = retrieval_crew.kickoff(inputs={"question": question})
answer_crew = Crew(
agents=[answer_agent],
tasks=[answer_task],
process=Process.sequential,
verbose=True,
)
result = answer_crew.kickoff(inputs={"question": question, "context": str(context)})
return str(result)
if __name__ == "__main__":
print(rag_answer("When should a claim be escalated?"))
Testing It
Run the script against questions whose answers are definitely present in your source files. If retrieval is working, you should see source snippets that clearly match key terms from your documents before the final answer is generated.
Then test failure cases by asking something outside your corpus; a good RAG pipeline should say it cannot find enough support rather than inventing details. If you want stronger guarantees, log both retrieved chunks and final outputs so you can inspect whether grounding actually happened.
For production use, add assertions around output shape and basic citation checks. If answers drift away from source text, reduce chunk size variance, tune k, or tighten the answer agent’s system instructions.
Next Steps
- •Add citations to each sentence by returning source IDs from
retrieve_context - •Replace FAISS with a managed vector database like Pinecone or pgvector
- •Add evaluation with golden प्रश्न-answer pairs and measure retrieval recall before shipping
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit