How to Build a transaction monitoring Agent Using CrewAI in Python for investment banking
A transaction monitoring agent watches trade and payment activity, flags patterns that look suspicious, and routes cases for human review. In investment banking, that matters because you need to catch market abuse, sanctions exposure, layering, wash trading, and unusual client behavior without drowning compliance teams in false positives.
Architecture
- •
Ingestion layer
- •Pulls transactions from OMS, FIX gateways, payment rails, or a warehouse.
- •Normalizes records into a common schema: client, instrument, venue, timestamp, amount, counterparty.
- •
Rules and feature extraction
- •Computes risk features like velocity spikes, round-tripping patterns, notional concentration, and jurisdiction risk.
- •Applies deterministic checks before the LLM ever sees the data.
- •
CrewAI agent layer
- •Uses a
Crewwith specializedAgentroles:- •triage analyst
- •policy reviewer
- •case summarizer
- •Produces structured assessments instead of free-form chat.
- •Uses a
- •
Compliance evidence store
- •Persists outputs, prompts, model versions, timestamps, and source transaction IDs.
- •Needed for auditability and model governance.
- •
Escalation workflow
- •Sends high-risk cases to investigators via ticketing or case management.
- •Keeps human-in-the-loop approval for anything that can affect filings or client action.
Implementation
1) Install dependencies and define the transaction schema
Start with a narrow schema. In banking systems, garbage in means noisy alerts out.
pip install crewai pydantic pandas
from pydantic import BaseModel
from typing import List
class Transaction(BaseModel):
transaction_id: str
client_id: str
instrument: str
venue: str
amount_usd: float
jurisdiction: str
timestamp: str
counterparty: str
flags: List[str] = []
2) Create tools for rule checks and evidence retrieval
Keep deterministic checks outside the model. The agent should explain findings, not invent them.
from crewai.tools import tool
@tool("rule_based_risk_score")
def rule_based_risk_score(transaction_json: str) -> str:
"""
Score a transaction using simple AML/market abuse heuristics.
Input must be a JSON string representing a transaction.
"""
import json
tx = json.loads(transaction_json)
score = 0
reasons = []
if tx["amount_usd"] > 5_000_000:
score += 30
reasons.append("High notional value")
if tx["jurisdiction"] in ["IR", "KP", "SY"]:
score += 50
reasons.append("High-risk jurisdiction")
if "rapid_reversal" in tx.get("flags", []):
score += 40
reasons.append("Rapid reversal pattern")
return json.dumps({"score": score, "reasons": reasons})
@tool("fetch_policy_excerpt")
def fetch_policy_excerpt(topic: str) -> str:
"""
Return a short internal compliance policy excerpt by topic.
Replace with your document store or vector search.
"""
policies = {
"sanctions": "Escalate any exposure involving sanctioned jurisdictions or entities.",
"market_abuse": "Review repeated same-day buy/sell activity and suspicious layering patterns.",
"recordkeeping": "Retain alert rationale, source data references, and reviewer actions."
}
return policies.get(topic.lower(), "No policy excerpt found.")
3) Build the CrewAI agents and task flow
Use one agent to triage risk and another to produce an auditable summary. The output should be structured enough for downstream systems.
import json
from crewai import Agent, Task, Crew, Process
triage_agent = Agent(
role="Transaction Triage Analyst",
goal="Identify suspicious transaction patterns using rules and compliance policy.",
backstory=(
"You work in an investment bank's surveillance team. "
"You must be precise, conservative, and audit-friendly."
),
tools=[rule_based_risk_score, fetch_policy_excerpt],
verbose=True,
)
summary_agent = Agent(
role="Case Summary Writer",
goal="Produce a concise investigation summary for compliance reviewers.",
backstory=(
"You write case notes for AML and market surveillance teams. "
"You never speculate beyond available evidence."
),
)
triage_task = Task(
description=(
"Review this transaction JSON: {transaction_json}. "
"Call rule_based_risk_score first. Then consult policy excerpts if needed. "
"Return JSON with keys: risk_level, score, reasons, recommended_action."
),
expected_output="Valid JSON only.",
agent=triage_agent,
)
summary_task = Task(
description=(
"Using the triage result and original transaction JSON, write an investigator summary "
"with fields: case_summary, key_risks, audit_notes."
),
expected_output="Structured summary suitable for case management.",
agent=summary_agent,
)
crew = Crew(
agents=[triage_agent, summary_agent],
tasks=[triage_task, summary_task],
process=Process.sequential,
verbose=True,
)
4) Run the agent on real transactions and persist the result
In production you would batch this from Kafka or your warehouse. Here’s the execution pattern you actually want.
sample_tx = Transaction(
transaction_id="TX-100928",
client_id="C-44122",
instrument="EURUSD_SWAP",
venue="LSE",
amount_usd=12_400_000,
jurisdiction="GB",
timestamp="2026-04-21T10:15:00Z",
counterparty="CP-7781",
flags=["rapid_reversal"]
)
result = crew.kickoff(inputs={"transaction_json": sample_tx.model_dump_json()})
print(result)
That gives you an auditable workflow:
- •deterministic scoring first,
- •policy lookup second,
- •structured output last.
If you need stricter control over output format in a real deployment:
- •validate the final string with
json.loads() - •reject malformed responses
- •store both raw output and parsed output in your case archive
Production Considerations
- •
Deployment boundaries
- •Run the agent inside your bank’s approved environment.
- •Keep data residency aligned with jurisdictional requirements; do not ship client trade data to unmanaged external services.
- •
Audit trail
- •Persist input payloads, tool outputs, final decisions, prompt versions, model versions, and timestamps.
- •Regulators will care more about traceability than clever prompts.
- •
Human approval gates
- •Never auto-file SAR/STR actions from an agent alone.
- •Use the agent to recommend escalation; let compliance approve final action.
- •
Monitoring
- •Track false positive rate by desk, asset class, region, and client segment.
- •Alert on drift when alert volumes spike after model or policy changes.
Common Pitfalls
- •
Letting the LLM do first-pass detection
- •Bad move. Use rules/features first so the model explains evidence instead of hallucinating risk signals.
- •Fix: keep scoring deterministic and feed only validated context into the agent.
- •
Returning unstructured prose
- •Free text is hard to route into case management systems.
- •Fix: require JSON fields like
risk_level,score,reasons, and validate them before storage.
- •
Ignoring governance constraints
- •If prompts contain raw PII or cross-border data without controls, you create compliance problems fast.
- •Fix: redact where possible, log everything needed for auditability, and keep deployment inside approved regions with access controls tied to least privilege.
Keep learning
- •The complete AI Agents Roadmap — my full 8-step breakdown
- •Free: The AI Agent Starter Kit — PDF checklist + starter code
- •Work with me — I build AI for banks and insurance companies
By Cyprian Aarons, AI Consultant at Topiax.
Want the complete 8-step roadmap?
Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.
Get the Starter Kit