CrewAI Tutorial (Python): persisting agent state for intermediate developers

By Cyprian AaronsUpdated 2026-04-21
crewaipersisting-agent-state-for-intermediate-developerspython

This tutorial shows you how to persist CrewAI agent state in Python so a workflow can resume after a process restart, a failed task, or a manual intervention. You need this when your agents handle multi-step work like claims triage, KYC review, or document extraction and you cannot afford to lose progress between runs.

What You'll Need

  • Python 3.10+
  • crewai
  • python-dotenv
  • An LLM API key, such as:
    • OPENAI_API_KEY
  • Basic familiarity with:
    • Agent
    • Task
    • Crew
    • Process
  • A local project folder with write access for state files

Step-by-Step

  1. Start by installing the packages and setting your API key. CrewAI can run with OpenAI-backed models out of the box, and we’ll use a simple .env file so the example stays portable.
pip install crewai python-dotenv
OPENAI_API_KEY=your_key_here
  1. Define an agent and tasks that produce structured outputs you can save. For persistence, the important part is that each step has a clear input and output boundary.
from dotenv import load_dotenv
load_dotenv()

from crewai import Agent, Task

researcher = Agent(
    role="Claims Researcher",
    goal="Collect relevant facts from the case notes",
    backstory="You work on insurance operations and summarize case details accurately.",
    verbose=True,
)

analyze_task = Task(
    description="Review the case notes and extract key facts: policy number, loss date, and issue summary.",
    expected_output="A concise bullet list with the extracted facts.",
    agent=researcher,
)

draft_task = Task(
    description="Turn the extracted facts into an internal triage note for the claims team.",
    expected_output="A short internal note with next actions.",
    agent=researcher,
)
  1. Create a small persistence layer that writes each task result to disk as JSON. This is the simplest reliable pattern: store state after every successful step, then reload it when you restart.
import json
from pathlib import Path

STATE_FILE = Path("crew_state.json")

def load_state() -> dict:
    if STATE_FILE.exists():
        return json.loads(STATE_FILE.read_text())
    return {"completed_tasks": {}, "run_count": 0}

def save_state(state: dict) -> None:
    STATE_FILE.write_text(json.dumps(state, indent=2))

state = load_state()
state["run_count"] += 1
save_state(state)
print(f"Loaded run #{state['run_count']}")
  1. Run tasks one at a time and persist each result immediately after it finishes. This gives you resumability without relying on hidden framework internals.
from crewai import Crew, Process

crew = Crew(
    agents=[researcher],
    tasks=[analyze_task, draft_task],
    process=Process.sequential,
    verbose=True,
)

if "analyze_task" not in state["completed_tasks"]:
    analyze_result = analyze_task.execute_sync()
    state["completed_tasks"]["analyze_task"] = str(analyze_result)
    save_state(state)

if "draft_task" not in state["completed_tasks"]:
    draft_result = draft_task.execute_sync()
    state["completed_tasks"]["draft_task"] = str(draft_result)
    save_state(state)

print(state["completed_tasks"]["draft_task"])
  1. Add resume logic so reruns skip completed work instead of starting over. In production, this is what keeps your workflow stable when a container restarts or an operator replays a job.
def run_with_resume():
    state = load_state()

    if "analyze_task" not in state["completed_tasks"]:
        result = analyze_task.execute_sync()
        state["completed_tasks"]["analyze_task"] = str(result)
        save_state(state)

    if "draft_task" not in state["completed_tasks"]:
        result = draft_task.execute_sync()
        state["completed_tasks"]["draft_task"] = str(result)
        save_state(state)

    return state["completed_tasks"]

final_state = run_with_resume()
print(final_state["draft_task"])
  1. If you want cleaner production behavior, separate persistence from execution and keep your saved payload small. Store task outputs, status flags, timestamps, and any IDs needed to reconstruct context later.
from datetime import datetime

def mark_complete(task_name: str, output: str) -> None:
    state = load_state()
    state["completed_tasks"][task_name] = {
        "output": output,
        "completed_at": datetime.utcnow().isoformat() + "Z",
    }
    save_state(state)

def get_output(task_name: str) -> str | None:
    state = load_state()
    item = state["completed_tasks"].get(task_name)
    if isinstance(item, dict):
        return item.get("output")
    return item

print(get_output("analyze_task"))

Testing It

Run the script once and confirm that crew_state.json is created with both task results stored under completed_tasks. Then run it again without deleting the file; the second run should skip already completed steps and reuse the saved outputs.

To test failure recovery, delete only one task entry from crew_state.json and rerun the script. The missing task should execute again while the completed task stays untouched.

If you’re wiring this into a real service, log the run ID and task names before saving state. That makes it easy to trace where execution stopped when debugging retries or partial failures.

Next Steps

  • Move from JSON files to PostgreSQL or Redis for multi-worker persistence.
  • Add per-task input hashes so you can detect stale results when upstream data changes.
  • Wrap this pattern in a small repository class so every crew in your codebase uses the same resume logic.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides