How to Build a transaction monitoring Agent Using CrewAI in Python for fintech

By Cyprian AaronsUpdated 2026-04-21
transaction-monitoringcrewaipythonfintech

A transaction monitoring agent scans payment events, flags suspicious patterns, and routes cases for review before losses compound. In fintech, that matters because the difference between a clean ledger and a regulatory headache is often a few milliseconds of detection plus a solid audit trail.

Architecture

  • Transaction intake

    • Pulls events from Kafka, SQS, webhook endpoints, or a database stream.
    • Normalizes fields like amount, currency, merchant_id, customer_id, country, and timestamp.
  • Risk scoring tool

    • Applies deterministic rules first: velocity checks, amount thresholds, geolocation mismatches, sanctioned country lookups.
    • Returns structured risk signals the agent can reason over.
  • Investigation agent

    • Uses CrewAI to analyze the event, compare it against policy, and decide whether to escalate.
    • Produces an explainable decision with evidence.
  • Case management tool

    • Writes alerts to your case system: Jira, ServiceNow, Salesforce, or an internal queue.
    • Stores the reason codes and raw evidence for audit.
  • Compliance logging

    • Persists every input, tool call, and output in immutable storage.
    • Supports model governance, internal review, and regulator requests.
  • Policy layer

    • Enforces what the agent can and cannot do.
    • Blocks PII leakage, unsupported recommendations, and decisions outside approved thresholds.

Implementation

1) Install CrewAI and define the transaction schema

Use a strict schema so your agent does not infer missing fields. In fintech, loose inputs become bad alerts fast.

from pydantic import BaseModel, Field
from typing import Optional

class TransactionEvent(BaseModel):
    transaction_id: str
    customer_id: str
    merchant_id: str
    amount: float = Field(gt=0)
    currency: str
    country: str
    timestamp: str
    channel: str
    device_id: Optional[str] = None

2) Create tools for rule checks and alert creation

Keep hard controls outside the LLM. The model should explain and classify; your tools should enforce policy.

from crewai.tools import BaseTool
from pydantic import BaseModel as PydanticBaseModel

class RiskCheckInput(PydanticBaseModel):
    transaction_id: str
    amount: float
    country: str
    channel: str

class RuleBasedRiskTool(BaseTool):
    name: str = "rule_based_risk_check"
    description: str = "Applies deterministic AML/fraud rules to a transaction."
    args_schema = RiskCheckInput

    def _run(self, transaction_id: str, amount: float, country: str, channel: str) -> str:
        reasons = []
        score = 0

        if amount >= 10000:
            score += 40
            reasons.append("high_amount")
        if country in {"IR", "KP", "SY"}:
            score += 80
            reasons.append("sanctioned_country")
        if channel.lower() == "card" and amount >= 3000:
            score += 20
            reasons.append("card_velocity_proxy")

        return {
            "transaction_id": transaction_id,
            "risk_score": min(score, 100),
            "reasons": reasons,
        }.__str__()

class CreateAlertTool(BaseTool):
    name: str = "create_alert"
    description: str = "Creates a compliance alert record."
    
    def _run(self, payload: str) -> str:
        # Replace with real case management integration.
        return f"alert_created:{payload}"

3) Build the CrewAI agents and task flow

This is the core pattern. The analyst agent inspects the event; the compliance agent turns that into an auditable disposition.

from crewai import Agent, Task, Crew, Process

risk_tool = RuleBasedRiskTool()
alert_tool = CreateAlertTool()

analyst = Agent(
    role="Transaction Monitoring Analyst",
    goal="Assess whether a transaction requires fraud or AML escalation.",
    backstory=(
        "You review fintech transactions using policy rules and evidence. "
        "You never invent facts. You always cite risk signals."
    ),
    tools=[risk_tool],
    verbose=True,
)

compliance_officer = Agent(
    role="Compliance Review Officer",
    goal="Convert analyst findings into an auditable case decision.",
    backstory=(
        "You write concise dispositions for regulators and internal audit. "
        "You only approve escalation when evidence supports it."
    ),
    tools=[alert_tool],
    verbose=True,
)

def build_monitoring_crew(event_dict: dict) -> Crew:
    analyze_task = Task(
        description=(
            f"Review this transaction for suspicious activity:\n{event_dict}\n\n"
            "Call the risk check tool first. Return JSON with fields "
            "`decision`, `risk_score`, `reasons`, `summary`."
        ),
        expected_output="A structured JSON assessment of the transaction.",
        agent=analyst,
    )

    escalate_task = Task(
        description=(
            "If risk is medium or high, create an alert record using the alert tool. "
            "Include reason codes and preserve auditability."
        ),
        expected_output="A case disposition or no-action result.",
        agent=compliance_officer,
        context=[analyze_task],
    )

    return Crew(
        agents=[analyst, compliance_officer],
        tasks=[analyze_task, escalate_task],
        process=Process.sequential,
        verbose=True,
    )

4) Run it with a real transaction payload

In production you would call this from your stream processor or API worker.

if __name__ == "__main__":
    event = TransactionEvent(
        transaction_id="tx_10001",
        customer_id="cus_7781",
        merchant_id="m_4422",
        amount=12500.0,
        currency="USD",
        country="US",
        timestamp="2026-04-21T12:30:00Z",
        channel="card",
        device_id="dev_9911",
    ).model_dump()

    crew = build_monitoring_crew(event)
    result = crew.kickoff()
    
    print(result)

Production Considerations

  • Deployment

    • Run the agent behind an internal API or worker queue.
    • Keep it stateless; persist cases in your own datastore with tenant isolation.
    • For data residency, pin processing to the region where customer data is allowed to live.
  • Monitoring

    • Track alert rate, false positive rate, average review time, and tool-call failures.
    • Log every prompt input/output pair with redaction for PANs, account numbers, emails, and device identifiers.
    • Add drift checks on rule hit rates by geography and channel.
  • Guardrails

    • Never let the LLM make final SAR/STR filing decisions without human approval.
    • Restrict tools to approved actions only; no free-form database writes.
    • Enforce policy filters for PII handling and jurisdiction-specific compliance rules.
  • Auditability

    • Store reason codes alongside model outputs.
    • Version prompts, tools, thresholds, and CrewAI task definitions.
    • Keep immutable logs for regulator review and internal model governance.

Common Pitfalls

  • Using the agent as the source of truth

    • Mistake: letting the LLM invent risk scores or override deterministic controls.
    • Fix: compute hard rules in code first; use CrewAI for explanation and triage.
  • Skipping structured outputs

    • Mistake: accepting free-text summaries from the agent.
    • Fix: require JSON-shaped responses with explicit fields like decision, risk_score, and reasons.
  • Ignoring compliance boundaries

    • Mistake: sending raw PII into prompts or routing data across regions without checking residency rules.
    • Fix: redact sensitive fields before inference and keep regional processing aligned with legal requirements.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides