How to Integrate Haystack for wealth management with Elasticsearch for RAG

By Cyprian AaronsUpdated 2026-04-21
haystack-for-wealth-managementelasticsearchrag

Haystack for wealth management gives you the orchestration layer for agentic workflows, while Elasticsearch gives you the retrieval layer that actually scales under real document volume. Put them together and you get a RAG system that can answer portfolio, policy, suitability, and client-service questions with low latency and auditable retrieval.

Prerequisites

  • Python 3.10+
  • An Elasticsearch cluster running locally or in your cloud environment
  • A Haystack installation that includes your wealth-management components
  • API keys or credentials for:
    • Elasticsearch
    • Your embedding model provider
    • Your LLM provider
  • A document set to index:
    • client policy docs
    • product sheets
    • investment memos
    • compliance FAQs

Install the core packages:

pip install haystack-ai elasticsearch sentence-transformers

If your Haystack for wealth management package is separate, install that too:

pip install haystack-wealth-management

Integration Steps

1) Connect to Elasticsearch

Start by creating a client and confirming the cluster is reachable. Keep this separate from your pipeline code so connection failures are obvious during deployment.

from elasticsearch import Elasticsearch

es = Elasticsearch(
    "http://localhost:9200",
    basic_auth=("elastic", "changeme"),
    request_timeout=30,
)

print(es.info())

For production, use TLS and secret-backed credentials. Do not hardcode passwords in the app.

2) Create an index for RAG documents

Use a dedicated index for wealth-management content. Store both raw text and vector embeddings so retrieval can support semantic search.

index_name = "wealth_rag_docs"

if not es.indices.exists(index=index_name):
    es.indices.create(
        index=index_name,
        mappings={
            "properties": {
                "content": {"type": "text"},
                "title": {"type": "text"},
                "doc_type": {"type": "keyword"},
                "client_segment": {"type": "keyword"},
                "embedding": {
                    "type": "dense_vector",
                    "dims": 384,
                    "index": True,
                    "similarity": "cosine",
                },
            }
        },
    )

Match dims to your embedding model. If you use all-MiniLM-L6-v2, 384 is correct.

3) Write documents into Elasticsearch with embeddings

Haystack pipelines typically use a document store plus an embedder. If your wealth-management package wraps Haystack components, keep the flow the same: convert source docs into Haystack Document objects, embed them, then write them to Elasticsearch.

from sentence_transformers import SentenceTransformer
from haystack import Document

model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

docs = [
    Document(
        content="This portfolio is restricted from high-volatility instruments due to client risk profile.",
        meta={"title": "Suitability Policy", "doc_type": "policy", "client_segment": "retail"},
    ),
    Document(
        content="Advisers must confirm KYC refresh every 12 months for active discretionary accounts.",
        meta={"title": "KYC Procedure", "doc_type": "compliance", "client_segment": "private_wealth"},
    ),
]

for doc in docs:
    embedding = model.encode(doc.content).tolist()
    es.index(
        index=index_name,
        document={
            "content": doc.content,
            **doc.meta,
            "embedding": embedding,
        },
    )

es.indices.refresh(index=index_name)

At this point Elasticsearch is your retriever backend. Haystack will sit on top of it to orchestrate query flow and answer generation.

4) Build the Haystack RAG pipeline with Elasticsearch retrieval

In Haystack, wire a retriever against your Elasticsearch-backed document store, then pass retrieved context into a generator. If you’re using a wealth-management-specific component set, this is where policy checks, client segmentation filters, or product constraints usually plug in.

from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

# Example imports; adjust if your wealth-management package exposes custom wrappers.
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchEmbeddingRetriever

document_store = ElasticsearchDocumentStore(
    hosts="http://localhost:9200",
    index=index_name,
)

retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(
    template="""
You are a wealth management assistant.
Use only the provided documents.

Question: {{question}}

Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}

Answer:
"""
)

generator = OpenAIGenerator(model="gpt-4o-mini")

pipe = Pipeline()
pipe.add_component("retriever", retriever)
pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("llm", generator)

pipe.connect("retriever.documents", "prompt_builder.documents")
pipe.connect("prompt_builder.prompt", "llm.prompt")

If your query path needs metadata filters, pass them at retrieval time. That is how you keep retail answers out of private-banking context and vice versa.

5) Run a filtered query end to end

Query with an embedding-aware search so the retriever uses semantic similarity instead of keyword-only matching.

query = "What restrictions apply to high-volatility investments?"
query_embedding = model.encode(query).tolist()

result = pipe.run({
    "retriever": {
        "query_embedding": query_embedding,
        # Optional metadata filter if supported by your retriever setup:
        # "filters": {"client_segment": ["retail"]}
    },
    "prompt_builder": {"question": query},
})

print(result["llm"]["replies"][0])

Testing the Integration

Run a direct Elasticsearch sanity check first, then verify Haystack can retrieve relevant context.

query_embedding = model.encode("What restrictions apply to high-volatility investments?").tolist()

resp = es.search(
    index=index_name,
    knn={
        "field": "embedding",
        "query_vector": query_embedding,
        "k": 3,
        "num_candidates": 10,
    },
    _source=["content", "title", "doc_type"],
)

for hit in resp["hits"]["hits"]:
    print(hit["_source"]["title"], "-", hit["_source"]["content"])

Expected output:

Suitability Policy - This portfolio is restricted from high-volatility instruments due to client risk profile.
KYC Procedure - Advisers must confirm KYC refresh every 12 months for active discretionary accounts.

If that returns relevant results, your retrieval layer is working. If Haystack still fails, check prompt wiring and embedding dimension mismatches first.

Real-World Use Cases

  • Advisor copilot
    • Answer questions like “Can this client buy structured notes?” using policy docs, suitability rules, and product metadata.
  • Compliance Q&A
    • Let internal staff ask natural-language questions over AML/KYC procedures with source-grounded answers.
  • Client service assistant
    • Retrieve account-service policies, fee schedules, and investment restrictions before generating responses for relationship managers.

Keep learning

By Cyprian Aarons, AI Consultant at Topiax.

Want the complete 8-step roadmap?

Grab the free AI Agent Starter Kit — architecture templates, compliance checklists, and a 7-email deep-dive course.

Get the Starter Kit

Related Guides