CrewAI Tutorial (Python): implementing retry logic for advanced developers

By Cyprian AaronsUpdated 2026-04-21
crewaiimplementing-retry-logic-for-advanced-developerspython

This tutorial shows you how to add retry logic around CrewAI agent execution in Python without turning your workflow into a mess of nested try/except blocks. You need this when agents fail intermittently because of rate limits, flaky tool calls, transient API errors, or bad intermediate outputs that should be retried instead of failing the whole run.

What You'll Need

  • Python 3.10+
  • crewai
  • python-dotenv
  • An OpenAI API key set as OPENAI_API_KEY
  • Optional: tenacity if you want more advanced retry policies later
  • A working CrewAI project with at least one agent and one task

Install the basics:

pip install crewai python-dotenv

Step-by-Step

  1. Start with a minimal CrewAI setup so you have a stable baseline before adding retries. The key is to keep the crew definition clean and move retry behavior outside the agent itself.
import os
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process

load_dotenv()

researcher = Agent(
    role="Research Analyst",
    goal="Produce accurate summaries",
    backstory="You analyze technical topics carefully and concisely.",
    verbose=True,
)

task = Task(
    description="Write a short summary of why retry logic matters in distributed systems.",
    expected_output="A concise technical summary.",
    agent=researcher,
)

crew = Crew(
    agents=[researcher],
    tasks=[task],
    process=Process.sequential,
    verbose=True,
)
  1. Wrap crew execution in a retry function that catches transient failures and tries again with backoff. This is the simplest production-safe pattern when you want retries without changing CrewAI internals.
import time
from typing import Callable, TypeVar

T = TypeVar("T")

def run_with_retries(fn: Callable[[], T], max_attempts: int = 3, base_delay: float = 2.0) -> T:
    last_error = None

    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except Exception as exc:
            last_error = exc
            if attempt == max_attempts:
                raise
            sleep_for = base_delay * attempt
            print(f"Attempt {attempt} failed: {exc}. Retrying in {sleep_for:.1f}s...")
            time.sleep(sleep_for)

    raise last_error  # pragma: no cover
  1. Use the wrapper around crew.kickoff() so every run gets retry protection. This keeps your agent code unchanged and makes the failure policy explicit at the orchestration layer.
def run_crew():
    return crew.kickoff()

if __name__ == "__main__":
    result = run_with_retries(run_crew, max_attempts=4, base_delay=1.5)
    print("\n=== FINAL RESULT ===")
    print(result)
  1. If your failure mode is specific to LLM calls, add targeted exception handling instead of retrying everything blindly. In production, you usually want to retry on rate limits and transient network errors, but fail fast on prompt bugs or invalid task definitions.
from openai import RateLimitError, APIConnectionError, APITimeoutError

RETRYABLE_ERRORS = (RateLimitError, APIConnectionError, APITimeoutError)

def run_with_llm_retries(fn: Callable[[], T], max_attempts: int = 3) -> T:
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except RETRYABLE_ERRORS as exc:
            if attempt == max_attempts:
                raise
            delay = 2 ** attempt
            print(f"Retryable LLM error on attempt {attempt}: {exc}. Sleeping {delay}s.")
            time.sleep(delay)
  1. For advanced workflows, separate task-level retries from full-crew retries. If one task fails because its input is malformed or a tool call times out, rerunning the entire crew may be wasteful; rerun only the failing unit when possible.
def safe_task_run(task_fn: Callable[[], T]) -> T:
    return run_with_retries(task_fn, max_attempts=2, base_delay=1.0)

def execute():
    first_pass = crew.kickoff()
    return first_pass

if __name__ == "__main__":
    output = safe_task_run(execute)
    print(output)

Testing It

Run the script once with a valid API key and confirm the crew completes normally. Then temporarily force a failure by disconnecting network access or using an invalid model configuration to see the retry messages appear before the final exception.

If you want a cleaner test, monkeypatch crew.kickoff() in a unit test to raise an exception on the first call and succeed on the second. That verifies your retry wrapper is actually re-invoking execution rather than just swallowing errors.

Also check that non-retryable errors still fail immediately if you use targeted exception handling. That’s where most bad retry implementations break: they keep hammering on deterministic bugs.

Next Steps

  • Add exponential backoff with jitter using tenacity
  • Move retries into a reusable orchestration layer for all crews in your codebase
  • Add structured logging so every failed attempt includes task name, exception type, and correlation ID

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides