How to Integrate Haystack for payments with Elasticsearch for multi-agent systems

By Cyprian AaronsUpdated 2026-04-21
haystack-for-paymentselasticsearchmulti-agent-systems

Combining Haystack for payments with Elasticsearch gives you a clean pattern for payment-aware multi-agent systems: one agent handles retrieval and reasoning, while another can fetch, validate, and act on payment-related data. The practical win is simple — you can route payment workflows, search transaction history, and ground agent decisions in indexed operational data instead of brittle prompts.

Prerequisites

  • Python 3.10+
  • An Elasticsearch cluster running locally or in your environment
  • API credentials for Haystack for payments
  • pip access to install the required SDKs
  • A service account or API key with permission to:
    • read payment records
    • create/search Elasticsearch indexes
    • call the Haystack payment APIs from your agent runtime

Install dependencies:

pip install elasticsearch haystack-ai

If your Haystack for payments package is distributed separately in your environment, install that SDK too.

Integration Steps

  1. Connect to Elasticsearch and create a payment index

Start by creating an index that stores normalized payment events. In multi-agent systems, this becomes the shared source of truth for retrieval agents.

from elasticsearch import Elasticsearch

es = Elasticsearch(
    "http://localhost:9200",
    basic_auth=("elastic", "changeme"),
)

index_name = "payment_events"

if not es.indices.exists(index=index_name):
    es.indices.create(
        index=index_name,
        mappings={
            "properties": {
                "payment_id": {"type": "keyword"},
                "customer_id": {"type": "keyword"},
                "status": {"type": "keyword"},
                "amount": {"type": "double"},
                "currency": {"type": "keyword"},
                "created_at": {"type": "date"},
                "notes": {"type": "text"},
            }
        },
    )
  1. Ingest payment records from Haystack for payments into Elasticsearch

Use Haystack for payments as the upstream system that emits or exposes payment objects. Map those objects into documents before indexing.

from datetime import datetime, timezone

# Example payload returned by your Haystack for payments workflow or API call
payment = {
    "id": "pay_1001",
    "customer_id": "cust_42",
    "status": "captured",
    "amount": 149.99,
    "currency": "USD",
    "notes": "Subscription renewal",
}

doc = {
    "payment_id": payment["id"],
    "customer_id": payment["customer_id"],
    "status": payment["status"],
    "amount": payment["amount"],
    "currency": payment["currency"],
    "created_at": datetime.now(timezone.utc).isoformat(),
    "notes": payment["notes"],
}

es.index(index=index_name, id=payment["id"], document=doc)
es.indices.refresh(index=index_name)

If your Haystack for payments SDK exposes a client like PaymentsClient, use it to fetch the canonical record before indexing:

# Example pattern; replace with your actual Haystack for payments client initialization.
from haystack_payments import PaymentsClient

payments = PaymentsClient(api_key="hp_live_xxx")

payment_record = payments.transactions.get("pay_1001")
es.index(index=index_name, id=payment_record.id, document={
    "payment_id": payment_record.id,
    "customer_id": payment_record.customer_id,
    "status": payment_record.status,
    "amount": float(payment_record.amount),
    "currency": payment_record.currency,
    "created_at": payment_record.created_at.isoformat(),
    "notes": payment_record.memo,
})
  1. Build a retrieval tool for your agents using Elasticsearch search

Your agent should not query raw transactions directly. Wrap Elasticsearch search in a narrow tool so the orchestration layer controls what each agent can see.

def search_payments(query: str, customer_id: str | None = None):
    must_clauses = [
        {"multi_match": {"query": query, "fields": ["notes", "status", "payment_id"]}}
    ]

    if customer_id:
        must_clauses.append({"term": {"customer_id": customer_id}})

    response = es.search(
        index=index_name,
        size=5,
        query={"bool": {"must": must_clauses}},
    )

    return [
        hit["_source"]
        for hit in response["hits"]["hits"]
    ]

results = search_payments("subscription renewal", customer_id="cust_42")
print(results)
  1. Wire the retrieval tool into a Haystack agent pipeline

In Haystack, you typically connect tools to an agent or pipeline so the model can decide when to retrieve indexed context. The exact component names vary by version, but the pattern is stable: define a tool around search_payments() and expose it to the agent.

from haystack import Pipeline
from haystack.components.builders import PromptBuilder

template = """
You are a payments operations assistant.
Use retrieved payment records to answer the question.

Question: {{question}}
Context:
{% for item in context %}
- {{ item.payment_id }} | {{ item.status }} | {{ item.amount }} {{ item.currency }}
{% endfor %}
"""

prompt_builder = PromptBuilder(template=template)

def retrieve_context(question: str):
    return search_payments(question)

pipeline = Pipeline()
pipeline.add_component("prompt_builder", prompt_builder)

question = "Find captured subscription renewals for cust_42"
context = retrieve_context(question)
result = prompt_builder.run(question=question, context=context)

print(result["prompt"])
  1. Add a multi-agent handoff pattern

For multi-agent systems, keep one agent focused on retrieval and another on actioning outcomes like refunds or dispute checks. Use Elasticsearch results as shared memory between agents.

def reconciliation_agent(query: str):
    matches = search_payments(query)
    return {
        "matches_count": len(matches),
        "matches": matches,
        "recommended_action":
            "refund_review" if any(m["status"] == "failed" for m in matches) else "no_action"
    }

def operations_agent(agent_output: dict):
    if agent_output["recommended_action"] == "refund_review":
        return f"Escalate {agent_output['matches_count']} failed payments to ops."
    return f"No action required for {agent_output['matches_count']} matches."

state = reconciliation_agent("failed card payments")
print(operations_agent(state))

Testing the Integration

Run an end-to-end check: write one record to Elasticsearch, retrieve it through your search wrapper, then pass it through your agent logic.

test_payment = {
    "payment_id": "__test_001__",
    "customer_id": "__test_customer__",
    "status": "captured",
    "amount": 19.99,
    "currency": "USD",
    "created_at": datetime.now(timezone.utc).isoformat(),
    "notes": "[TEST] monthly plan",
}

es.index(index=index_name, id=test_payment["payment_id"], document=test_payment)
es.indices.refresh(index=index_name)

hits = search_payments("monthly plan", customer_id="__test_customer__")
print(f"hits={len(hits)}")
print(hits[0]["payment_id"])

Expected output:

hits=1
__test_001__

Real-World Use Cases

  • Payment support copilot

    • A support agent searches indexed transactions in Elasticsearch while Haystack generates grounded responses about status, retries, and settlement timing.
  • Fraud triage workflow

    • One agent retrieves suspicious patterns from Elasticsearch; another agent uses Haystack-based reasoning to summarize risk signals and route cases.
  • Reconciliation assistant

    • Batch jobs index ledger events into Elasticsearch, then agents compare failed captures, duplicate charges, and missing settlements across systems.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides