How to Build a transaction monitoring Agent Using CrewAI in Python for investment banking

By Cyprian AaronsUpdated 2026-04-21
transaction-monitoringcrewaipythoninvestment-banking

A transaction monitoring agent watches trade and payment activity, flags patterns that look suspicious, and routes cases for human review. In investment banking, that matters because you need to catch market abuse, sanctions exposure, layering, wash trading, and unusual client behavior without drowning compliance teams in false positives.

Architecture

  • Ingestion layer

    • Pulls transactions from OMS, FIX gateways, payment rails, or a warehouse.
    • Normalizes records into a common schema: client, instrument, venue, timestamp, amount, counterparty.
  • Rules and feature extraction

    • Computes risk features like velocity spikes, round-tripping patterns, notional concentration, and jurisdiction risk.
    • Applies deterministic checks before the LLM ever sees the data.
  • CrewAI agent layer

    • Uses a Crew with specialized Agent roles:
      • triage analyst
      • policy reviewer
      • case summarizer
    • Produces structured assessments instead of free-form chat.
  • Compliance evidence store

    • Persists outputs, prompts, model versions, timestamps, and source transaction IDs.
    • Needed for auditability and model governance.
  • Escalation workflow

    • Sends high-risk cases to investigators via ticketing or case management.
    • Keeps human-in-the-loop approval for anything that can affect filings or client action.

Implementation

1) Install dependencies and define the transaction schema

Start with a narrow schema. In banking systems, garbage in means noisy alerts out.

pip install crewai pydantic pandas
from pydantic import BaseModel
from typing import List

class Transaction(BaseModel):
    transaction_id: str
    client_id: str
    instrument: str
    venue: str
    amount_usd: float
    jurisdiction: str
    timestamp: str
    counterparty: str
    flags: List[str] = []

2) Create tools for rule checks and evidence retrieval

Keep deterministic checks outside the model. The agent should explain findings, not invent them.

from crewai.tools import tool

@tool("rule_based_risk_score")
def rule_based_risk_score(transaction_json: str) -> str:
    """
    Score a transaction using simple AML/market abuse heuristics.
    Input must be a JSON string representing a transaction.
    """
    import json

    tx = json.loads(transaction_json)
    score = 0
    reasons = []

    if tx["amount_usd"] > 5_000_000:
        score += 30
        reasons.append("High notional value")

    if tx["jurisdiction"] in ["IR", "KP", "SY"]:
        score += 50
        reasons.append("High-risk jurisdiction")

    if "rapid_reversal" in tx.get("flags", []):
        score += 40
        reasons.append("Rapid reversal pattern")

    return json.dumps({"score": score, "reasons": reasons})


@tool("fetch_policy_excerpt")
def fetch_policy_excerpt(topic: str) -> str:
    """
    Return a short internal compliance policy excerpt by topic.
    Replace with your document store or vector search.
    """
    policies = {
        "sanctions": "Escalate any exposure involving sanctioned jurisdictions or entities.",
        "market_abuse": "Review repeated same-day buy/sell activity and suspicious layering patterns.",
        "recordkeeping": "Retain alert rationale, source data references, and reviewer actions."
    }
    return policies.get(topic.lower(), "No policy excerpt found.")

3) Build the CrewAI agents and task flow

Use one agent to triage risk and another to produce an auditable summary. The output should be structured enough for downstream systems.

import json
from crewai import Agent, Task, Crew, Process

triage_agent = Agent(
    role="Transaction Triage Analyst",
    goal="Identify suspicious transaction patterns using rules and compliance policy.",
    backstory=(
        "You work in an investment bank's surveillance team. "
        "You must be precise, conservative, and audit-friendly."
    ),
    tools=[rule_based_risk_score, fetch_policy_excerpt],
    verbose=True,
)

summary_agent = Agent(
    role="Case Summary Writer",
    goal="Produce a concise investigation summary for compliance reviewers.",
    backstory=(
        "You write case notes for AML and market surveillance teams. "
        "You never speculate beyond available evidence."
    ),
)

triage_task = Task(
    description=(
        "Review this transaction JSON: {transaction_json}. "
        "Call rule_based_risk_score first. Then consult policy excerpts if needed. "
        "Return JSON with keys: risk_level, score, reasons, recommended_action."
    ),
    expected_output="Valid JSON only.",
    agent=triage_agent,
)

summary_task = Task(
    description=(
        "Using the triage result and original transaction JSON, write an investigator summary "
        "with fields: case_summary, key_risks, audit_notes."
    ),
    expected_output="Structured summary suitable for case management.",
    agent=summary_agent,
)

crew = Crew(
   agents=[triage_agent, summary_agent],
   tasks=[triage_task, summary_task],
   process=Process.sequential,
   verbose=True,
)

4) Run the agent on real transactions and persist the result

In production you would batch this from Kafka or your warehouse. Here’s the execution pattern you actually want.

sample_tx = Transaction(
   transaction_id="TX-100928",
   client_id="C-44122",
   instrument="EURUSD_SWAP",
   venue="LSE",
   amount_usd=12_400_000,
   jurisdiction="GB",
   timestamp="2026-04-21T10:15:00Z",
   counterparty="CP-7781",
   flags=["rapid_reversal"]
)

result = crew.kickoff(inputs={"transaction_json": sample_tx.model_dump_json()})

print(result)

That gives you an auditable workflow:

  • deterministic scoring first,
  • policy lookup second,
  • structured output last.

If you need stricter control over output format in a real deployment:

  • validate the final string with json.loads()
  • reject malformed responses
  • store both raw output and parsed output in your case archive

Production Considerations

  • Deployment boundaries

    • Run the agent inside your bank’s approved environment.
    • Keep data residency aligned with jurisdictional requirements; do not ship client trade data to unmanaged external services.
  • Audit trail

    • Persist input payloads, tool outputs, final decisions, prompt versions, model versions, and timestamps.
    • Regulators will care more about traceability than clever prompts.
  • Human approval gates

    • Never auto-file SAR/STR actions from an agent alone.
    • Use the agent to recommend escalation; let compliance approve final action.
  • Monitoring

    • Track false positive rate by desk, asset class, region, and client segment.
    • Alert on drift when alert volumes spike after model or policy changes.

Common Pitfalls

  1. Letting the LLM do first-pass detection

    • Bad move. Use rules/features first so the model explains evidence instead of hallucinating risk signals.
    • Fix: keep scoring deterministic and feed only validated context into the agent.
  2. Returning unstructured prose

    • Free text is hard to route into case management systems.
    • Fix: require JSON fields like risk_level, score, reasons, and validate them before storage.
  3. Ignoring governance constraints

    • If prompts contain raw PII or cross-border data without controls, you create compliance problems fast.
    • Fix: redact where possible, log everything needed for auditability, and keep deployment inside approved regions with access controls tied to least privilege.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides