CrewAI Tutorial (Python): implementing retry logic for intermediate developers

By Cyprian AaronsUpdated 2026-04-21
crewaiimplementing-retry-logic-for-intermediate-developerspython

This tutorial shows how to add retry logic to a CrewAI workflow in Python without turning your agents into a mess of nested try/except blocks. You’ll build a small pattern that retries failed agent runs, backs off between attempts, and keeps your crew usable when an LLM call or tool invocation fails transiently.

What You'll Need

  • Python 3.10 or newer
  • crewai
  • langchain-openai
  • python-dotenv
  • An OpenAI API key set as OPENAI_API_KEY
  • Basic familiarity with:
    • Agent
    • Task
    • Crew
    • kickoff()

Install the packages:

pip install crewai langchain-openai python-dotenv

Step-by-Step

  1. First, create a minimal CrewAI setup with one agent and one task. Keep the retry logic outside the agent itself so you can reuse it across different crews and tasks.
import os
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI

load_dotenv()

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.2,
    api_key=os.getenv("OPENAI_API_KEY"),
)

researcher = Agent(
    role="Research Analyst",
    goal="Summarize the user's request accurately",
    backstory="You produce concise, reliable summaries.",
    llm=llm,
    verbose=True,
)

task = Task(
    description="Summarize why retry logic matters in agent workflows.",
    expected_output="A short explanation of retry logic in production AI systems.",
    agent=researcher,
)

crew = Crew(
    agents=[researcher],
    tasks=[task],
    process=Process.sequential,
    verbose=True,
)
  1. Next, wrap crew.kickoff() in a retry function. This is the core pattern: catch transient exceptions, wait, then try again with exponential backoff.
import time


def run_with_retries(crew: Crew, max_attempts: int = 3, base_delay: float = 2.0):
    last_error = None

    for attempt in range(1, max_attempts + 1):
        try:
            print(f"Attempt {attempt}/{max_attempts}")
            return crew.kickoff()
        except Exception as exc:
            last_error = exc
            if attempt == max_attempts:
                break

            delay = base_delay * (2 ** (attempt - 1))
            print(f"Run failed: {exc}")
            print(f"Retrying in {delay:.1f}s...")
            time.sleep(delay)

    raise last_error
  1. Then make the retry logic more practical by only retrying errors that are likely transient. You do not want to keep retrying on bad prompts or schema mistakes that will fail every time.
def is_retryable_error(exc: Exception) -> bool:
    message = str(exc).lower()

    retryable_markers = [
        "rate limit",
        "timeout",
        "temporarily unavailable",
        "connection reset",
        "502",
        "503",
        "504",
        "server error",
    ]

    return any(marker in message for marker in retryable_markers)


def run_with_selective_retries(crew: Crew, max_attempts: int = 3):
    for attempt in range(1, max_attempts + 1):
        try:
            return crew.kickoff()
        except Exception as exc:
            if attempt == max_attempts or not is_retryable_error(exc):
                raise

            wait_seconds = 2 ** (attempt - 1)
            print(f"Retryable failure: {exc}")
            print(f"Waiting {wait_seconds}s before retry...")
            time.sleep(wait_seconds)
  1. After that, use the wrapper in your main execution path. This keeps your application code clean and makes retries consistent across different crews.
if __name__ == "__main__":
    if not os.getenv("OPENAI_API_KEY"):
        raise ValueError("OPENAI_API_KEY is not set")

    result = run_with_selective_retries(crew, max_attempts=4)
    print("\nFinal result:")
    print(result)
  1. If you want production-grade behavior, add logging instead of plain prints and include attempt metadata. That gives you observability when a specific agent keeps failing under load.
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("crewai-retries")


def run_with_logged_retries(crew: Crew, max_attempts: int = 3):
    for attempt in range(1, max_attempts + 1):
        try:
            logger.info("Running crew attempt %s/%s", attempt, max_attempts)
            return crew.kickoff()
        except Exception as exc:
            logger.warning("Attempt %s failed: %s", attempt, exc)

            if attempt == max_attempts or not is_retryable_error(exc):
                logger.error("Giving up after %s attempts", attempt)
                raise

            time.sleep(2 ** (attempt - 1))

Testing It

Run the script normally first and confirm you get a valid response from the agent. Then simulate a transient failure by temporarily pointing your OpenAI key to an invalid network environment or by reducing rate limits through repeated calls; you should see retries happen before success or final failure.

Also test a non-retryable failure by breaking the prompt or expected output format so the error is deterministic. In that case, the wrapper should fail immediately instead of wasting time on useless retries.

If you want stronger validation, log each attempt number and verify that backoff increases between retries. That tells you your wrapper is behaving predictably under load.

Next Steps

  • Add per-task retry policies so some tasks can fail fast while others can retry longer.
  • Wrap tool calls inside agents with the same selective-retry pattern.
  • Add structured logging and metrics so you can track retry rates in production.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides