# Building Custom Vector Database Operators for NVIDIA RAG


## Overview

This is an advanced notebook to demonstrate how to create and integrate custom vector database (VDB) operators with the NVIDIA RAG blueprint. This will guide you to build your own VDB implementations that work with `NvidiaRAG` and `NvidiaRAGIngestor` components.

## Key Topics Covered

- **VDB Operator Architecture** – Understanding the `VDBRag` base class and required interfaces
- **Custom Implementation** – Building a complete [OpenSearch VDB](https://opensearch.org/) operator from scratch
- **Integration Patterns** – Connecting your custom VDB with NVIDIA RAG pipelines
- **Best Practices** – Patterns with error handling and lifecycle management


## Prerequisites

- NVIDIA NGC API key for accessing models
- Docker for running dependent services (Milvus, NIMs, etc.)
- Python environment with NVIDIA RAG package installed
- Basic understanding of vector databases and RAG concepts


**Note:** This pattern can be adapted for any vector database by implementing the same interface methods.

## Setup for NVIDIA RAG Python Package

Please refer to [rag_library_usage.ipynb](./rag_library_usage.ipynb) for detailed installation instructions for rag libary.

**Quick install (in a Python venv):**

```bash
# activate python venv using uv
uv pip install nvidia-rag[all]
```


## Setting up the dependencies

After the environment for the python package is set up, we launch all the dependent services and NIMs that the pipeline depends on.
Fulfill the [prerequisites here](../docs/deploy-docker-self-hosted.md) to set up docker on your system.

### 1. Setup the default configurations

In [None]:
!uv pip install python-dotenv
import os
from getpass import getpass

from dotenv import load_dotenv

load_dotenv(dotenv_path=".env_library", override=True)

Provide your `NGC_API_KEY` after executing the cell below. You can obtain a key by following steps [here](../docs/api-key.md).

In [None]:
# del os.environ['NVIDIA_API_KEY']  ## delete key and reset if needed
if os.environ.get("NGC_API_KEY", "").startswith("nvapi-"):
    print("Valid NGC_API_KEY already in environment. Delete to reset")
else:
    candidate_api_key = getpass("NVAPI Key (starts with nvapi-): ")
    assert candidate_api_key.startswith("nvapi-"), (
        f"{candidate_api_key[:5]}... is not a valid key"
    )
    os.environ["NGC_API_KEY"] = candidate_api_key
    os.environ["NVIDIA_API_KEY"] = candidate_api_key

Login to nvcr.io which is needed for pulling the containers of dependencies

In [None]:
!echo "${NGC_API_KEY}" | docker login nvcr.io -u '$oauthtoken' --password-stdin

### 2. Setup OpenSearch Vector DB using Docker Compose

Follow these steps in cells below to set up OpenSearch as your vector database:


**2.1. Create Docker Compose Configuration**

Create a `docker-compose-opensearch.yaml` file in `deploy/compose/` directory by running the below cell.


**[Optional] Advanced Configuration**
- **Memory Settings**: Adjust `OPENSEARCH_JAVA_OPTS` based on your system resources
- **Security**: Security plugin is disabled for simplicity. Enable for production use
- **Network**: Uses `nvidia-rag` network for integration with other services
- **Data Persistence**: To keep data peristent, the opensearch data can be mounted to external volume with following steps:
  - Create volume directory and provide required permissions:
    ```bash
    sudo mkdir -p deploy/compose/volumes/opensearch/
    sudo chmod -R 777 deploy/compose/volumes/opensearch/
    ```
  - Mount volume:
    ```yaml
        volumes:
          - ./volumes/opensearch:/usr/share/opensearch/data/
    ```

In [None]:
yaml_file_content = """services:
  opensearch:
    image: opensearchproject/opensearch:3.1.0
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=10g"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD="myStrongPassword123@456"
      - DISABLE_SECURITY_PLUGIN=true
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:9200" ]
      interval: 30s
      timeout: 20s
      retries: 3
    profiles: ["opensearch"]

networks:
  default:
    name: nvidia-rag
"""

with open("../deploy/compose/docker-compose-opensearch.yaml", "w") as f:
    f.write(yaml_file_content)

**2.2. Start OpenSearch Service**

In [None]:
# Start OpenSearch with the opensearch profile
# Make sure elasticsearch container is not running (check with `docker ps`), since it uses same port
# ! docker stop <elasticsearch_container_name> # Uncomment this if elasticsearch is running
!docker compose -f ../deploy/compose/docker-compose-opensearch.yaml --profile opensearch up -d

**2.4. Verify OpenSearch is Running and Healthy**

In [None]:
# Check if OpenSearch is Running
!docker ps | grep opensearch

In [None]:
# Check if OpenSearch is healthy
!curl -X GET "localhost:9200/_cluster/health?pretty"

**2.5: Install OpenSearch Python Client**

Install the `opensearch-py` client in your current environment. This allows your Python code to connect to and interact with the OpenSearch service.

In [None]:
# Install OpenSearch Python Client in your environment
!uv pip install opensearch-py # Use `pip install opensearch-py` if you are not using uv

### 3. Setup other dependencies

In [None]:
# Make sure MinIO is running (required for Citations)
!docker compose -f ../deploy/compose/vectordb.yaml --profile minio up -d

### 4. Setup the NIMs

**Option 1: Deploy on-prem models**

Move to Option 2 if you are interested in using NVIDIA-hosted models.

Ensure you meet [the hardware requirements](../docs/support-matrix.md). By default the NIMs are configured to use 2xH100.

In [None]:
# Create the model cache directory
!mkdir -p ~/.cache/model-cache

In [None]:
# Set the MODEL_DIRECTORY environment variable in the Python kernel
import os

os.environ["MODEL_DIRECTORY"] = os.path.expanduser("~/.cache/model-cache")
print("MODEL_DIRECTORY set to:", os.environ["MODEL_DIRECTORY"])

In [None]:
# Configure GPU IDs for the various microservices if needed
os.environ["EMBEDDING_MS_GPU_ID"] = "0"
os.environ["RANKING_MS_GPU_ID"] = "0"
os.environ["YOLOX_MS_GPU_ID"] = "0"
os.environ["YOLOX_GRAPHICS_MS_GPU_ID"] = "0"
os.environ["YOLOX_TABLE_MS_GPU_ID"] = "0"
os.environ["OCR_MS_GPU_ID"] = "0"
os.environ["LLM_MS_GPU_ID"] = "1"

Select your hardware-specific profile name as per the guidance provided in [NIM Model Profile Configuration](../docs/model-profiles.md) section.

In [None]:
os.environ["NIM_MODEL_PROFILE"] = "......" # Populate your profile name as per hardware

In [None]:
# Deploying NIMs - This may take a while as models download. If kernel times out, just rerun this cell.
!USERID=$(id -u) docker compose -f ../deploy/compose/nims.yaml up -d

In [None]:
# Watch the status of running containers (run this cell repeatedly or in a terminal)
!docker ps

Ensure all the below are running and healthy before proceeding further
```output
NAMES                           STATUS
nemoretriever-ranking-ms        Up ... (healthy)
compose-page-elements-1         Up ...
compose-paddle-1                Up ...
compose-graphic-elements-1      Up ...
compose-table-structure-1       Up ...
nemoretriever-embedding-ms      Up ... (healthy)
nim-llm-ms                      Up ... (healthy)
```

**Option 2: Using Nvidia Hosted models**

In [None]:
os.environ["APP_LLM_MODELNAME"] = "nvidia/llama-3.3-nemotron-super-49b-v1.5"
os.environ["APP_EMBEDDINGS_MODELNAME"] = "nvidia/llama-3.2-nv-embedqa-1b-v2"
os.environ["APP_RANKING_MODELNAME"] = "nvidia/llama-3.2-nv-rerankqa-1b-v2"
os.environ["APP_EMBEDDINGS_SERVERURL"] = ""
os.environ["APP_LLM_SERVERURL"] = ""
os.environ["APP_RANKING_SERVERURL"] = (
    "https://ai.api.nvidia.com/v1/retrieval/nvidia/llama-3_2-nv-rerankqa-1b-v2/reranking/v1"
)
os.environ["OCR_HTTP_ENDPOINT"] = "https://ai.api.nvidia.com/v1/cv/baidu/paddleocr"
os.environ["OCR_INFER_PROTOCOL"] = "http"
os.environ["YOLOX_HTTP_ENDPOINT"] = (
    "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2"
)
os.environ["YOLOX_INFER_PROTOCOL"] = "http"
os.environ["YOLOX_GRAPHIC_ELEMENTS_HTTP_ENDPOINT"] = (
    "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1"
)
os.environ["YOLOX_GRAPHIC_ELEMENTS_INFER_PROTOCOL"] = "http"
os.environ["YOLOX_TABLE_STRUCTURE_HTTP_ENDPOINT"] = (
    "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1"
)
os.environ["YOLOX_TABLE_STRUCTURE_INFER_PROTOCOL"] = "http"

### 4. Setup the Nvidia Ingest runtime and redis service

In [None]:
!docker compose -f ../deploy/compose/docker-compose-ingestor-server.yaml up nv-ingest-ms-runtime redis -d

## Build a Custom VDB Operator - Library Mode

Create a custom Vector Database operator by implementing the `VDBRag` interface, then pass it to `NvidiaRAG`/`NvidiaRAGIngestor`.

Refer to [`docs/change-vectordb.md`](../docs/change-vectordb.md#integrate-in-library-mode-developer-friendly) for detailed steps to integrate your own vector database with Nvidia RAG in library mode.

**Note:** 
The implementation below demonstrates integration in library mode.
If you wish to proceed with server mode, please refer to the documentation here: [Integrate Into NVIDIA RAG (Server Mode)](../docs/change-vectordb.md#integrate-into-nvidia-rag-server-mode).

### 1. Defining your own VDBRag class for Opensearch

**Defining Helper functions for OpenSearch**

In [None]:
"""
Helper functions for OpenSearch query construction.

This cell provides utility functions to generate OpenSearch queries for:
- Retrieving all unique document sources (`get_unique_sources_query`)
- Deleting a metadata schema by collection name (`get_delete_metadata_schema_query`)
- Fetching the metadata schema for a specific collection (`get_metadata_schema_query`)

These helpers are used by the custom VDB operator to manage and query metadata in OpenSearch.
"""


def get_unique_sources_query():
    """
    Generate aggregation query to retrieve all unique document sources.
    """
    query_unique_sources = {
        "size": 0,
        "aggs": {
            "unique_sources": {
                "composite": {
                    "size": 1000,  # Adjust size depending on number of unique values
                    "sources": [
                        {
                            "source_name": {
                                "terms": {
                                    "field": "metadata.source.source_name.keyword"
                                }
                            }
                        }
                    ],
                },
                "aggs": {
                    "top_hit": {
                        "top_hits": {
                            "size": 1  # Just one document per source_name
                        }
                    }
                },
            }
        },
    }
    return query_unique_sources


def get_delete_metadata_schema_query(collection_name: str):
    """
    Create deletion query for removing metadata schema by collection name.
    """
    query_delete_metadata_schema = {
        "query": {"term": {"collection_name.keyword": collection_name}}
    }
    return query_delete_metadata_schema


def get_metadata_schema_query(collection_name: str):
    """
    Build search query to retrieve metadata schema for specified collection.
    """
    query_metadata_schema = {"query": {"term": {"collection_name": collection_name}}}
    return query_metadata_schema


def get_delete_docs_query(source_value: str):
    """
    Construct deletion query for documents matching the source value.
    """
    query_delete_documents = {
        "query": {"term": {"metadata.source.source_name.keyword": source_value}}
    }
    return query_delete_documents


def create_metadata_collection_mapping():
    """Generate Elasticsearch index mapping for metadata schema collections."""
    return {
        "mappings": {
            "properties": {
                "collection_name": {
                    "type": "keyword"  # or "text" depending on your search needs
                },
                "metadata_schema": {
                    "type": "object",  # For JSON-like structure
                    "enabled": True,  # Set to False if you don't want to index its fields
                },
            }
        }
    }

**Defining the Opensearch `VDBRag` class**

The following cell contains the implementation of each method of the `VDBRag` class. For a comprehensive understanding of the functionality and design of each method, it is highly recommended to carefully review the respective docstrings provided with each implementation.

In [None]:
import time
import logging
from typing import Any

from nvidia_rag.utils.vdb.vdb_base import VDBRag
from nvidia_rag.utils.vdb import DEFAULT_METADATA_SCHEMA_COLLECTION

from langchain_core.vectorstores import VectorStore
from langchain_core.documents import Document
from langchain_core.runnables import RunnableAssign, RunnableLambda
from langchain_community.vectorstores import OpenSearchVectorSearch

from nv_ingest_client.util.milvus import cleanup_records
from opentelemetry import context as otel_context

logger = logging.getLogger(__name__)

class OpenSearchVDB(VDBRag):
    """
    OpenSearchVDB is a vector database implementation using OpenSearch for RAG (Retrieval-Augmented Generation) applications.

    This class provides comprehensive functionality for document ingestion, indexing, retrieval, and metadata management
    using OpenSearch as the backend vector database. It supports embedding-based similarity search, metadata filtering,
    and collection management operations.

    Inherits from VDBRag to provide a standardized interface for vector database operations.
    """

    def __init__(
        self,
        opensearch_url="http://localhost:9200",
        index_name="test",
        meta_dataframe=None,
        meta_source_field=None,
        meta_fields=None,
        embedding_model=None
    ):
        """
        Initialize the OpenSearchVDB instance with connection parameters and metadata configuration.

        Args:
            opensearch_url (str, optional): The URL endpoint for the OpenSearch cluster.
                                          Defaults to "http://localhost:9200".
            index_name (str, optional): The name of the OpenSearch index to use for storing documents.
                                      Defaults to "test".
            meta_dataframe (pandas.DataFrame, optional): DataFrame containing metadata information for documents.
                                                       Used for enriching document metadata during ingestion.
            meta_source_field (str, optional): The field name in meta_dataframe that corresponds to document sources.
                                             Used for joining metadata with documents.
            meta_fields (list, optional): List of metadata field names to extract and store with documents.
                                        These fields will be searchable and filterable.
            embedding_model (object, optional): The embedding model instance used for generating vector embeddings.
                                              Must be compatible with langchain embedding interfaces.

        Returns:
            None: This is a constructor method that initializes the instance.

        Raises:
            ConnectionError: If unable to connect to the specified OpenSearch URL.
            ValueError: If invalid parameters are provided for index_name or embedding_model.

        Example:
            >>> from sentence_transformers import SentenceTransformer
            >>> embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
            >>> vdb = OpenSearchVDB(
            ...     opensearch_url="http://localhost:9200",
            ...     index_name="my_documents",
            ...     embedding_model=embedding_model
            ... )
        """
        self.opensearch_url = opensearch_url
        self.embedding_model = embedding_model
        self.index_name = index_name
        self.opensearch_vs = self.get_langchain_vectorstore(
            collection_name=index_name
        )
        self.meta_dataframe = meta_dataframe
        self.meta_source_field = meta_source_field
        self.meta_fields = meta_fields

    # ----------------------------------------------------------------------------------------------
    @property
    def collection_name(self) -> str:
        """
        Get the current collection name (index name) for the OpenSearch vector database.

        This property provides a standardized interface to access the collection name,
        which is internally stored as index_name in OpenSearch terminology.

        Args:
            None

        Returns:
            str: The current collection/index name being used for document storage and retrieval.

        Example:
            >>> vdb = OpenSearchVDB(index_name="my_collection")
            >>> print(vdb.collection_name)
            'my_collection'
        """
        return self.index_name

    # ----------------------------------------------------------------------------------------------
    @collection_name.setter
    def collection_name(self, collection_name: str) -> None:
        """
        Set the collection name (index name) for the OpenSearch vector database.

        This property setter allows changing the target collection/index for subsequent
        operations. The change affects all future document storage and retrieval operations.

        Args:
            collection_name (str): The new collection/index name to use. Must be a valid
                                 OpenSearch index name (lowercase, no spaces, valid characters).

        Returns:
            None: This is a setter method that modifies the instance state.

        Raises:
            ValueError: If the collection_name contains invalid characters for OpenSearch index names.

        Example:
            >>> vdb = OpenSearchVDB()
            >>> vdb.collection_name = "new_collection"
            >>> print(vdb.collection_name)
            'new_collection'
        """
        self.index_name = collection_name

    # ----------------------------------------------------------------------------------------------
    def create_index(
        self
    ):
        """
        Create a new OpenSearch index if it doesn't already exist.

        This method initializes a new OpenSearch index with predefined settings optimized
        for vector similarity search using FAISS engine. The index is created with 2048
        dimensions to accommodate typical embedding models.

        Args:
            None (uses instance attributes)

        Returns:
            None: This method performs index creation as a side effect.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            PermissionError: If insufficient permissions to create indices on the cluster.
            ValueError: If the index_name contains invalid characters.

        Side Effects:
            - Creates a new OpenSearch index if it doesn't exist
            - Configures the index with FAISS engine for vector search
            - Sets dimension to 2048 for vector embeddings

        Example:
            >>> vdb = OpenSearchVDB(index_name="my_new_index")
            >>> vdb.create_index()  # Creates index if not exists
        """
        if not self.check_collection_exists(self.index_name):
            self.opensearch_vs.create_index(
                index_name=self.index_name,
                dimension=2048,
                engine="faiss"
            )

    # ----------------------------------------------------------------------------------------------
    def write_to_index(self, records: list, **kwargs) -> None:
        """
        Write processed records with embeddings to the OpenSearch index.

        This method processes raw records by cleaning them, extracting text and vector embeddings,
        organizing metadata, and then storing everything in the OpenSearch index. The records are
        preprocessed using the cleanup_records utility to ensure consistent formatting.

        Args:
            records (list): List of record dictionaries containing document data. Each record should have:
                          - 'text': The document text content
                          - 'vector': Pre-computed embedding vector (list/array of floats)
                          - 'source': Source identifier for the document
                          - 'content_metadata': Additional metadata as dictionary
            **kwargs: Additional keyword arguments (currently unused but available for extension)

        Returns:
            None: This method performs bulk indexing as a side effect.

        Raises:
            ValueError: If records are malformed or missing required fields.
            ConnectionError: If unable to connect to OpenSearch cluster.
            IndexError: If the target index doesn't exist or is misconfigured.

        Side Effects:
            - Processes and cleans input records using configured metadata settings
            - Bulk inserts documents with embeddings into OpenSearch index
            - Refreshes the index to make documents immediately searchable
            - Logs the number of successfully added records

        Example:
            >>> records = [
            ...     {
            ...         'text': 'Sample document text',
            ...         'vector': [0.1, 0.2, ...],  # 2048-dim embedding
            ...         'source': 'doc1.pdf',
            ...         'content_metadata': {'page': 1, 'section': 'intro'}
            ...     }
            ... ]
            >>> vdb.write_to_index(records)
        """
        # Clean up and flatten records to pull appropriate fields from the records
        cleaned_records = cleanup_records(
            records=records,
            meta_dataframe=self.meta_dataframe,
            meta_source_field=self.meta_source_field,
            meta_fields=self.meta_fields,
        )

        # Prepare texts, embeddings, and metadatas from cleaned records
        texts, embeddings, metadatas = [], [], []
        for cleaned_record in cleaned_records:
            texts.append(cleaned_record.get("text"))
            embeddings.append(cleaned_record.get("vector"))
            metadatas.append(
                {
                    "source": cleaned_record.get("source"),
                    "content_metadata": cleaned_record.get("content_metadata"),
                }
            )

        # Add texts, embeddings, and metadatas to the OpenSearch index
        self.opensearch_vs.add_embeddings(
            text_embeddings=zip(texts, embeddings),
            metadatas=metadatas,
        )
        logger.info(
            f"Added {len(texts)} records to Opensearch index {self.index_name}"
        )
        self.opensearch_vs.client.indices.refresh(index=self.index_name)

    # ----------------------------------------------------------------------------------------------
    def retrieval(self, queries: list, **kwargs) -> list[dict[str, Any]]:
        """
        Retrieve relevant documents from OpenSearch based on input queries.

        This method performs semantic similarity search using vector embeddings to find
        the most relevant documents for the given queries. Currently, this is a placeholder
        method that requires implementation for the specific OpenSearchVDB use case.

        Args:
            queries (list): List of query strings to search for. Each query will be processed
                          to find the most semantically similar documents in the index.
            **kwargs: Additional keyword arguments for retrieval configuration such as:
                    - top_k (int): Number of top results to return per query
                    - filter_expr (dict): Metadata filters to apply during search
                    - threshold (float): Minimum similarity score threshold

        Returns:
            list[dict[str, Any]]: List of retrieved documents with their metadata and scores.
                                Each document dict contains:
                                - 'text': The document content
                                - 'metadata': Document metadata including source info
                                - 'score': Similarity score for the query

        Raises:
            NotImplementedError: This method is currently not implemented and serves as a placeholder.
            ConnectionError: If unable to connect to OpenSearch cluster.
            ValueError: If queries are malformed or empty.

        Example:
            >>> queries = ["What is machine learning?", "How does AI work?"]
            >>> results = vdb.retrieval(queries, top_k=5)
            # Currently raises NotImplementedError
        """
        # Placeholder: implement actual retrieval logic
        raise NotImplementedError("retrieval must be implemented for OpenSearchVDB")

    # ----------------------------------------------------------------------------------------------
    def reindex(self, records: list, **kwargs) -> None:
        """
        Reindex existing documents in the OpenSearch index with updated data or embeddings.

        This method handles the reindexing of documents that already exist in the OpenSearch
        index. It can be used to update document content, refresh embeddings, or modify
        metadata without losing the original document structure.

        Args:
            records (list): List of record dictionaries containing updated document data.
                          Each record should include identifiers to match existing documents.
            **kwargs: Additional keyword arguments for reindexing configuration such as:
                    - batch_size (int): Number of documents to process in each batch
                    - update_embeddings (bool): Whether to regenerate embeddings
                    - preserve_metadata (bool): Whether to keep existing metadata

        Returns:
            None: This method performs reindexing as a side effect.

        Raises:
            NotImplementedError: This method is currently not implemented and serves as a placeholder.
            ConnectionError: If unable to connect to OpenSearch cluster.
            ValueError: If records are malformed or missing required identifiers.
            IndexError: If target documents don't exist in the index.

        Side Effects:
            - Updates existing documents in the OpenSearch index
            - Refreshes embeddings if specified
            - Maintains document version history for conflict resolution

        Example:
            >>> updated_records = [
            ...     {
            ...         'id': 'doc_123',
            ...         'text': 'Updated document content',
            ...         'vector': [0.2, 0.3, ...],  # New embedding
            ...         'metadata': {'version': 2}
            ...     }
            ... ]
            >>> vdb.reindex(updated_records)
            # Currently raises NotImplementedError
        """
        # Placeholder: implement actual reindex logic
        raise NotImplementedError("reindex must be implemented for OpenSearchVDB")

    # ----------------------------------------------------------------------------------------------
    def run(
        self,
        records: list,
    ) -> None:
        """
        Execute the complete document ingestion pipeline for OpenSearch.

        This is the main orchestration method that performs the full workflow of
        document ingestion: creating the index if it doesn't exist, then writing
        all provided records to the index. This method provides a simple interface
        for bulk document processing.

        Args:
            records (list): List of record dictionaries containing document data to ingest.
                          Each record should follow the format expected by write_to_index().

        Returns:
            None: This method performs the complete ingestion workflow as a side effect.

        Raises:
            ConnectionError: If unable to connect to OpenSearch cluster.
            ValueError: If records are malformed or missing required fields.
            PermissionError: If insufficient permissions for index creation or document insertion.

        Side Effects:
            - Creates the target index if it doesn't already exist
            - Bulk inserts all provided records into the OpenSearch index
            - Refreshes the index to make documents immediately searchable
            - Logs the ingestion progress and results

        Example:
            >>> records = [
            ...     {'text': 'Doc 1 content', 'vector': [...], 'source': 'doc1.pdf'},
            ...     {'text': 'Doc 2 content', 'vector': [...], 'source': 'doc2.pdf'}
            ... ]
            >>> vdb.run(records)  # Complete ingestion workflow
        """
        self.create_index()
        self.write_to_index(records)

    # ----------------------------------------------------------------------------------------------
    # Methods for the VDBRag class for ingestion
    async def check_health(self) -> dict[str, Any]:
        """Check Opensearch database health"""
        status = {
            "service": "Opensearch",
            "url": self.opensearch_url,
            "status": "unknown",
            "error": None,
        }

        if not self.opensearch_url:
            status["status"] = "skipped"
            status["error"] = "No URL provided"
            return status

        try:
            start_time = time.time()

            cluster_health = self.opensearch_vs.client.cluster.health()
            indices = self.opensearch_vs.client.cat.indices(format="json")

            status["status"] = "healthy"
            status["latency_ms"] = round((time.time() - start_time) * 1000, 2)
            status["indices"] = len(indices)
            status["cluster_status"] = cluster_health.get("status", "unknown")

        except ImportError:
            status["status"] = "error"
            status["error"] = (
                "Opensearch client not available (opensearch-py library not installed)"
            )
        except Exception as e:
            status["status"] = "error"
            status["error"] = str(e)

        return status

    def create_collection(
        self,
        collection_name: str,
        dimension: int = 2048,
        collection_type: str = "text",
    ) -> None:
        """
        Create a new collection (index) in OpenSearch with specified configuration.

        This method implements the VDBRag interface for collection creation, providing
        a standardized way to create new document collections with appropriate vector
        search configuration.

        Args:
            collection_name (str): Name of the collection to create. Must be a valid
                                 OpenSearch index name (lowercase, no spaces).
            dimension (int, optional): Dimensionality of the vector embeddings to store.
                                     Defaults to 2048 to match common embedding models.
            collection_type (str, optional): Type of collection being created.
                                           Defaults to "text" for text document storage.

        Returns:
            None: This method creates the collection as a side effect.

        Raises:
            ConnectionError: If unable to connect to OpenSearch cluster.
            PermissionError: If insufficient permissions to create indices.
            ValueError: If collection_name contains invalid characters.

        Side Effects:
            - Creates a new OpenSearch index with vector search capabilities
            - Configures the index for the specified embedding dimensions
            - Sets up FAISS engine for efficient similarity search

        Example:
            >>> vdb.create_collection("my_documents", dimension=768, collection_type="text")
        """
        self.create_index()

    # ----------------------------------------------------------------------------------------------
    def check_collection_exists(
        self,
        collection_name: str,
    ) -> bool:
        """
        Check whether a specified collection (index) exists in OpenSearch.

        This method provides a simple boolean check to determine if a collection
        is already present in the OpenSearch cluster before attempting operations
        that require the collection to exist.

        Args:
            collection_name (str): Name of the collection to check for existence.
                                 Should be a valid OpenSearch index name.

        Returns:
            bool: True if the collection exists, False otherwise.

        Raises:
            ConnectionError: If unable to connect to OpenSearch cluster.
            PermissionError: If insufficient permissions to check index existence.

        Example:
            >>> exists = vdb.check_collection_exists("my_documents")
            >>> if exists:
            ...     print("Collection already exists")
            >>> else:
            ...     vdb.create_collection("my_documents")
        """
        return self.opensearch_vs.client.indices.exists(index=collection_name)

    # ----------------------------------------------------------------------------------------------
    def get_collection(self) -> list[dict[str, Any]]:
        """
        Retrieve comprehensive information about all collections in the OpenSearch cluster.

        This method scans the OpenSearch cluster for all available indices (collections),
        retrieves their document counts, and fetches associated metadata schemas to provide
        a complete overview of the vector database contents.

        Args:
            None (operates on the entire OpenSearch cluster)

        Returns:
            list[dict[str, Any]]: List of collection information dictionaries. Each dict contains:
                                - 'collection_name': Name of the collection/index
                                - 'num_entities': Number of documents in the collection
                                - 'metadata_schema': Schema definition for metadata fields

        Raises:
            ConnectionError: If unable to connect to OpenSearch cluster.
            PermissionError: If insufficient permissions to list indices or access metadata.

        Side Effects:
            - Creates metadata schema collection if it doesn't exist
            - Queries all non-hidden indices in the cluster
            - Retrieves metadata schemas for each collection

        Example:
            >>> collections = vdb.get_collection()
            >>> for collection in collections:
            ...     print(f"Collection: {collection['collection_name']}")
            ...     print(f"Documents: {collection['num_entities']}")
            ...     print(f"Schema: {collection['metadata_schema']}")
        """
        self.create_metadata_schema_collection()
        indices = self.opensearch_vs.client.cat.indices(format="json")
        collection_info = []
        for index in indices:
            index_name = index["index"]
            if not index_name.startswith("."):  # Ignore hidden indices
                metadata_schema = self.get_metadata_schema(index_name)
                collection_info.append(
                    {
                        "collection_name": index_name,
                        "num_entities": index["docs.count"],
                        "metadata_schema": metadata_schema,
                    }
                )
        return collection_info

    # ----------------------------------------------------------------------------------------------
    def delete_collections(
        self,
        collection_names: list[str],
    ) -> None:
        """
        Delete multiple collections (indices) and their associated metadata from OpenSearch.

        This method performs a comprehensive deletion of collections, removing both the
        main document indices and their associated metadata schemas. The deletion is
        performed safely with ignore_unavailable=True to handle missing collections gracefully.

        Args:
            collection_names (list[str]): List of collection names to delete. Each name
                                        should be a valid OpenSearch index name.

        Returns:
            dict: Deletion summary containing:
                - 'message': Status message about the deletion process
                - 'successful': List of successfully deleted collection names
                - 'failed': List of collection names that failed to delete
                - 'total_success': Count of successful deletions
                - 'total_failed': Count of failed deletions

        Raises:
            ConnectionError: If unable to connect to OpenSearch cluster.
            PermissionError: If insufficient permissions to delete indices.

        Side Effects:
            - Deletes the specified collections/indices from OpenSearch
            - Removes associated metadata schemas from the metadata collection
            - Logs the deletion results for auditing purposes
            - Ignores collections that don't exist (fail-safe operation)

        Example:
            >>> result = vdb.delete_collections(["old_docs", "temp_collection"])
            >>> print(f"Deleted: {result['successful']}")
            >>> print(f"Failed: {result['failed']}")
        """
        _ = self.opensearch_vs.client.indices.delete(
            index=",".join(collection_names), ignore_unavailable=True
        )
        deleted_collections, failed_collections = collection_names, []
        logger.info(f"Collections deleted: {deleted_collections}")

        # Delete the metadata schema from the collection
        for collection_name in deleted_collections:
            _ = self.opensearch_vs.client.delete_by_query(
                index=DEFAULT_METADATA_SCHEMA_COLLECTION,
                body=get_delete_metadata_schema_query(collection_name),
            )
        return {
            "message": "Collection deletion process completed.",
            "successful": deleted_collections,
            "failed": failed_collections,
            "total_success": len(deleted_collections),
            "total_failed": len(failed_collections),
        }

    # ----------------------------------------------------------------------------------------------
    def get_documents(
        self,
        collection_name: str,
    ) -> list[dict[str, Any]]:
        """Retrieve all unique documents from the specified collection.

        This method queries the OpenSearch index to find all unique documents based on their
        source names. It aggregates documents by their source and returns metadata information
        for each unique document.

        Args:
            collection_name (str): The name of the collection/index to retrieve documents from.

        Returns:
            list[dict[str, Any]]: A list of dictionaries containing document information.
                Each dictionary has the following structure:
                - document_name (str): The basename of the source file
                - metadata (dict): Dictionary containing metadata fields and their values

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            ValueError: If the collection_name is invalid or doesn't exist.
            IndexError: If the aggregation query fails or returns unexpected results.

        Example:
            >>> vdb = OpenSearchVDB(index_name="my_collection")
            >>> documents = vdb.get_documents("my_collection")
            >>> print(documents[0])
            {
                'document_name': 'example.pdf',
                'metadata': {'author': 'John Doe', 'created': '2024-01-01'}
            }
        """
        # metadata_schema = self.get_metadata_schema(collection_name)
        metadata_schema = []
        response = self.opensearch_vs.client.search(
            index=collection_name,
            body=get_unique_sources_query(),
        )
        documents_list = []
        for hit in response["aggregations"]["unique_sources"]["buckets"]:
            source_name = hit["key"]["source_name"]
            metadata = (
                hit["top_hit"]["hits"]["hits"][0]["_source"]
                .get("metadata", {})
                .get("content_metadata", {})
            )
            metadata_dict = {}
            for metadata_item in metadata_schema:
                metadata_name = metadata_item.get("name")
                metadata_value = metadata.get(metadata_name, None)
                metadata_dict[metadata_name] = metadata_value
            documents_list.append(
                {
                    "document_name": os.path.basename(source_name),
                    "metadata": metadata_dict,
                }
            )
        return documents_list

    # ----------------------------------------------------------------------------------------------
    def delete_documents(
        self,
        collection_name: str,
        source_values: list[str],
    ) -> bool:
        """Delete documents from a collection by source values.

        This method removes documents from the OpenSearch index that match the provided
        source values. It performs bulk deletion and refreshes the index to ensure
        changes are immediately visible.

        Args:
            collection_name (str): The name of the collection/index to delete documents from.
            source_values (list[str]): List of source identifiers for documents to delete.
                These should match the 'source' field values in the stored documents.

        Returns:
            bool: True if the deletion operation completes successfully, False otherwise.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            ValueError: If collection_name is invalid or source_values is empty.
            PermissionError: If insufficient permissions to delete documents from the index.

        Side Effects:
            - Removes matching documents from the OpenSearch index
            - Refreshes the index to make deletions immediately visible
            - Logs the deletion operations for audit purposes

        Example:
            >>> vdb = OpenSearchVDB(index_name="my_collection")
            >>> sources_to_delete = ["document1.pdf", "document2.pdf"]
            >>> success = vdb.delete_documents("my_collection", sources_to_delete)
            >>> print(f"Deletion successful: {success}")
            True
        """
        for source_value in source_values:
            self.opensearch_vs.client.delete_by_query(
                index=collection_name, body=get_delete_docs_query(source_value)
            )
        self.opensearch_vs.client.indices.refresh(index=collection_name)
        return True

    # ----------------------------------------------------------------------------------------------
    def create_metadata_schema_collection(
        self,
    ) -> None:
        """Initialize the metadata schema storage collection.

        This method creates a dedicated OpenSearch index for storing metadata schemas
        associated with document collections. The schema collection uses a predefined
        mapping optimized for metadata field definitions and collection associations.

        Args:
            None (uses instance attributes for configuration)

        Returns:
            None: This method performs index creation as a side effect.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            PermissionError: If insufficient permissions to create indices.
            ValueError: If the default metadata schema collection name is invalid.

        Side Effects:
            - Creates the DEFAULT_METADATA_SCHEMA_COLLECTION index if it doesn't exist
            - Applies the metadata collection mapping for schema storage
            - Logs the creation status and configuration details

        Note:
            This is typically called once during system initialization. Subsequent calls
            will detect the existing collection and log that it already exists.

        Example:
            >>> vdb = OpenSearchVDB(opensearch_url="http://localhost:9200")
            >>> vdb.create_metadata_schema_collection()
            # Creates the metadata schema storage index
        """
        mapping = create_metadata_collection_mapping()
        if not self.check_collection_exists(collection_name=DEFAULT_METADATA_SCHEMA_COLLECTION):
            self.opensearch_vs.client.indices.create(
                index=DEFAULT_METADATA_SCHEMA_COLLECTION, body=mapping
            )
            logging_message = (
                f"Collection {DEFAULT_METADATA_SCHEMA_COLLECTION} created "
                + f"at {self.opensearch_url} with mapping {mapping}"
            )
            logger.info(logging_message)
        else:
            logging_message = f"Collection {DEFAULT_METADATA_SCHEMA_COLLECTION} already exists at {self.opensearch_url}"
            logger.info(logging_message)

    # ----------------------------------------------------------------------------------------------
    def add_metadata_schema(
        self,
        collection_name: str,
        metadata_schema: list[dict[str, Any]],
    ) -> None:
        """Add metadata schema to an OpenSearch index.

        This method stores or updates the metadata schema definition for a specific
        collection. It first removes any existing schema for the collection, then
        adds the new schema definition to enable consistent metadata handling.

        Args:
            collection_name (str): The name of the collection to associate with this schema.
            metadata_schema (list[dict[str, Any]]): List of metadata field definitions.
                Each dictionary should contain field specifications like:
                - name (str): The metadata field name
                - type (str): The data type (string, integer, date, etc.)
                - description (str): Optional field description

        Returns:
            None: This method stores the schema as a side effect.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            ValueError: If collection_name is invalid or metadata_schema is malformed.
            PermissionError: If insufficient permissions to modify the metadata schema index.

        Side Effects:
            - Removes any existing metadata schema for the collection
            - Stores the new metadata schema in the dedicated schema index
            - Logs the schema addition with details for audit purposes

        Example:
            >>> schema = [
            ...     {"name": "author", "type": "string", "description": "Document author"},
            ...     {"name": "created_date", "type": "date", "description": "Creation date"}
            ... ]
            >>> vdb.add_metadata_schema("my_collection", schema)
            # Schema is now stored and associated with my_collection
        """
        # Delete the metadata schema from the index
        _ = self.opensearch_vs.client.delete_by_query(
            index=DEFAULT_METADATA_SCHEMA_COLLECTION,
            body=get_delete_metadata_schema_query(collection_name),
        )
        # Add the metadata schema to the index
        data = {
            "collection_name": collection_name,
            "metadata_schema": metadata_schema,
        }
        self.opensearch_vs.client.index(index=DEFAULT_METADATA_SCHEMA_COLLECTION, body=data)
        logger.info(
            f"Metadata schema added to the Opensearch index {collection_name}. Metadata schema: {metadata_schema}"
        )

    # ----------------------------------------------------------------------------------------------
    def get_metadata_schema(
        self,
        collection_name: str,
    ) -> list[dict[str, Any]]:
        """Get the metadata schema for a collection in the OpenSearch index.

        This method retrieves the stored metadata schema definition for a specific
        collection from the dedicated metadata schema index. Returns an empty list
        if no schema is found.

        Args:
            collection_name (str): The name of the collection to retrieve schema for.

        Returns:
            list[dict[str, Any]]: List of metadata field definitions for the collection.
                Each dictionary contains field specifications. Returns empty list if
                no schema is found for the collection.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            ValueError: If collection_name is invalid.
            IndexError: If the metadata schema index is corrupted or inaccessible.

        Side Effects:
            - Queries the metadata schema index
            - Logs informational messages about schema retrieval status

        Example:
            >>> vdb = OpenSearchVDB(opensearch_url="http://localhost:9200")
            >>> schema = vdb.get_metadata_schema("my_collection")
            >>> print(f"Found {len(schema)} metadata fields")
            Found 3 metadata fields
            >>> print(schema[0])
            {"name": "author", "type": "string", "description": "Document author"}
        """
        query = get_metadata_schema_query(collection_name)
        response = self.opensearch_vs.client.search(
            index=DEFAULT_METADATA_SCHEMA_COLLECTION, body=query
        )
        if len(response["hits"]["hits"]) > 0:
            return response["hits"]["hits"][0]["_source"]["metadata_schema"]
        else:
            logging_message = (
                f"No metadata schema found for the collection: {collection_name}."
                + " Possible reason: The collection is not created with metadata schema."
            )
            logger.info(logging_message)
            return []

    # ----------------------------------------------------------------------------------------------
    # Methods for the VDBRag class for retrieval
    def get_langchain_vectorstore(
        self,
        collection_name: str,
    ) -> VectorStore:
        """Get the vectorstore for a collection.

        This method creates and returns a LangChain-compatible OpenSearchVectorSearch
        instance configured for the specified collection. The vectorstore can be used
        directly with LangChain retrievers and chains.

        Args:
            collection_name (str): The name of the collection/index to create vectorstore for.

        Returns:
            VectorStore: A configured OpenSearchVectorSearch instance that implements
                the LangChain VectorStore interface for semantic search operations.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            ValueError: If collection_name is invalid or doesn't exist.
            ImportError: If required LangChain dependencies are not installed.

        Side Effects:
            - Creates a new vectorstore instance (lightweight operation)
            - Validates connection to the specified OpenSearch index

        Example:
            >>> vdb = OpenSearchVDB(opensearch_url="http://localhost:9200",
            ...                     embedding_model=embedding_model)
            >>> vectorstore = vdb.get_langchain_vectorstore("my_collection")
            >>> retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
            >>> docs = retriever.get_relevant_documents("query text")
        """
        return OpenSearchVectorSearch(
            opensearch_url=self.opensearch_url,
            index_name=collection_name,
            embedding_function=self.embedding_model,
            use_ssl=False,
        )

    # ----------------------------------------------------------------------------------------------
    def retrieval_langchain(
        self,
        query: str,
        collection_name: str,
        vectorstore: OpenSearchVectorSearch = None,
        top_k: int = 10,
        filter_expr: list[dict[str, Any]] = [],
        otel_ctx: otel_context = None,
    ) -> list[dict[str, Any]]:
        """Perform semantic search and return top-k relevant documents.

        This method executes semantic similarity search using LangChain's retrieval
        framework. It supports filtering, custom vectorstore instances, and OpenTelemetry
        tracing for observability.

        Args:
            query (str): The search query text to find similar documents for.
            collection_name (str): The name of the collection to search in.
            vectorstore (OpenSearchVectorSearch, optional): Pre-configured vectorstore
                instance. If None, creates a new one for the collection.
            top_k (int, optional): Maximum number of documents to return. Defaults to 10.
            filter_expr (list[dict[str, Any]], optional): List of filter expressions to
                apply during search. Each dict should contain field-value constraints.
            otel_ctx (otel_context, optional): OpenTelemetry context for distributed tracing.

        Returns:
            list[dict[str, Any]]: List of retrieved documents with similarity scores,
                content, and metadata. Each document includes the collection_name in
                its metadata for multi-collection support.

        Raises:
            ConnectionError: If unable to connect to the OpenSearch cluster.
            ValueError: If query is empty or collection_name is invalid.
            IndexError: If the search operation fails or returns malformed results.

        Side Effects:
            - Executes similarity search against the OpenSearch index
            - Measures and logs retrieval latency for performance monitoring
            - Adds collection_name to each document's metadata
            - Attaches/detaches OpenTelemetry context for tracing

        Example:
            >>> vdb = OpenSearchVDB(opensearch_url="http://localhost:9200")
            >>> filters = [{"term": {"metadata.author": "john_doe"}}]
            >>> docs = vdb.retrieval_langchain(
            ...     query="machine learning concepts",
            ...     collection_name="research_papers",
            ...     top_k=5,
            ...     filter_expr=filters
            ... )
            >>> print(f"Found {len(docs)} relevant documents")
        """
        if vectorstore is None:
            vectorstore = self.get_langchain_vectorstore(collection_name)

        if not filter_expr:
            filter_expr = []

        token = otel_context.attach(otel_ctx)
        start_time = time.time()

        retriever = vectorstore.as_retriever(
            search_kwargs={"k": top_k, "fetch_k": top_k}
        )
        retriever_lambda = RunnableLambda(
            lambda x: retriever.invoke(x, filter=filter_expr)
        )
        retriever_chain = {"context": retriever_lambda} | RunnableAssign(
            {"context": lambda input: input["context"]}
        )
        retriever_docs = retriever_chain.invoke(query, config={"run_name": "retriever"})
        docs = retriever_docs.get("context", [])

        end_time = time.time()
        latency = end_time - start_time
        logger.info(f" OpenSearchVectorSearch Retriever latency: {latency:.4f} seconds")

        otel_context.detach(token)
        return self._add_collection_name_to_retreived_docs(docs, collection_name)

    # ----------------------------------------------------------------------------------------------
    @staticmethod
    def _add_collection_name_to_retreived_docs(
        docs: list[Document], collection_name: str
    ) -> list[Document]:
        """Add the collection name to the retrieved documents.

        This is done to ensure the collection name is available in the
        metadata of the documents for preparing citations in case of multi-collection retrieval.

        This static utility method enhances document metadata by adding the source
        collection name to each document. This is essential for multi-collection
        RAG scenarios where citations need to identify the source collection.

        Args:
            docs (list[Document]): List of LangChain Document objects retrieved from search.
                Each document should have content and metadata attributes.
            collection_name (str): The name of the collection these documents originated from.
                This will be added to each document's metadata.

        Returns:
            list[Document]: The same list of documents with collection_name added to
                each document's metadata under the "collection_name" key.

        Raises:
            AttributeError: If documents don't have the expected metadata attribute.
            TypeError: If docs is not a list or contains non-Document objects.

        Side Effects:
            - Modifies the metadata of each input document in-place
            - Adds "collection_name" field to document metadata

        Note:
            This is a static method that can be used independently of class instances.
            It's designed to be called after retrieval operations to prepare documents
            for citation generation in multi-collection scenarios.

        Example:
            >>> from langchain.schema import Document
            >>> docs = [
            ...     Document(page_content="Text 1", metadata={"source": "doc1.pdf"}),
            ...     Document(page_content="Text 2", metadata={"source": "doc2.pdf"})
            ... ]
            >>> enhanced_docs = OpenSearchVDB._add_collection_name_to_retreived_docs(
            ...     docs, "research_collection"
            ... )
            >>> print(enhanced_docs[0].metadata)
            {"source": "doc1.pdf", "collection_name": "research_collection"}
        """
        for doc in docs:
            doc.metadata["collection_name"] = collection_name
        return docs

### 2. Initialise the Opensearch VDBRag operator object

In [None]:
# Embedding Model Setup:
# Here we configure the embedding model, the neural engine that transforms your text into high-dimensional vectors.
# Tip: Choose a model that fits your domain and scale for best results
from nvidia_rag.utils.common import get_config
from nvidia_rag.utils.embedding import get_embedding_model

CONFIG = get_config()
embedding_model = get_embedding_model(
    model=CONFIG.embeddings.model_name,
    url=CONFIG.embeddings.server_url,
    # url="localhost:9080" # TODO: Uncomment while using on-prem embeddings, and comment the above line
)

In [None]:
# Initialize the Vector Database (VDB) Operator for OpenSearch
# This operator acts as the bridge between your RAG pipeline and the underlying OpenSearch vector store.
# Configure it here to enable semantic search, hybrid retrieval, and seamless document management.
# Tip: Adjust the parameters below to match your OpenSearch deployment and embedding model

opensearch_vdb_op = OpenSearchVDB(
    opensearch_url="http://localhost:9200",
    index_name="test_library",
    embedding_model=embedding_model
)

First let's set the required logging level. Set to INFO for displaying basic important logs. Set to DEBUG for full verbosity.

In [None]:
import logging
import os

# Set the log level via environment variable before importing nvidia_rag
# This ensures the package respects our log level setting
LOGLEVEL = logging.WARNING  # Set to INFO, DEBUG, WARNING or ERROR
os.environ["LOGLEVEL"] = logging.getLevelName(LOGLEVEL)

# Configure logging
logging.basicConfig(level=LOGLEVEL, force=True)

# Set log levels for specific loggers after package import
for name in logging.root.manager.loggerDict:
    if name == "nvidia_rag" or name.startswith("nvidia_rag."):
        logging.getLogger(name).setLevel(LOGLEVEL)
    if name == "nv_ingest_client" or name.startswith("nv_ingest_client."):
        logging.getLogger(name).setLevel(LOGLEVEL)

### 3. Register the Opensearch VDBRag operator with `NvidiaRAG()` and `NvidiaRAGIngestor()` class

In [None]:
from nvidia_rag import NvidiaRAG, NvidiaRAGIngestor

ingestor = NvidiaRAGIngestor(
     vdb_op=opensearch_vdb_op
)

rag = NvidiaRAG(
    vdb_op=opensearch_vdb_op
)

### 4. Utilizing Nvidia RAG Library APIs with a Opensearch Vector Database Operator

**4.1. Create a new collection**

Creates a new collection in the vector database.

In [None]:
response = ingestor.create_collection()
print(response)

**4.2. List all collections**

Retrieves all available collections from the vector database.

In [None]:
response = ingestor.get_collections()
print(response)

**4.3. Add a document**

Uploads new documents to the specified collection in the vector database. In case you have a requirement of updating existing documents in the specified collection, you can call `update_documents()` instead of `upload_documents()`.

In [None]:
response = await ingestor.upload_documents(
    blocking=False,
    split_options={"chunk_size": 512, "chunk_overlap": 150},
    filepaths=[
        "../data/multimodal/woods_frost.docx",
        "../data/multimodal/multimodal_test.pdf",
    ],
    generate_summary=False,
)
task_id = response.get("task_id")
print(response)

**4.4. Check document upload status**

Checks the status of a document upload/update task.

In [None]:
response = await ingestor.status(task_id=task_id)
print(response)

**4.5. [Optional] Update a document in a collection**

In case you have a requirement of updating an existing document in the specified collection, execute below cell.

In [None]:
response = await ingestor.update_documents(
    blocking=False,
    filepaths=["../data/multimodal/woods_frost.docx"],
    generate_summary=False,
)
print(response)

**4.6. Get documents in a collection**

Retrieves the list of documents uploaded to a collection.

In [None]:
response = ingestor.get_documents()
print(response)

**4.7. Query a document using RAG**

Sends a chat-style query to the RAG system using the specified models and endpoints.

Check health of all dependent services for rag

In [None]:
import json

health_status_with_deps = await rag.health()
print(json.dumps(health_status_with_deps, indent=2))

Prepare output parser

In [None]:
import base64
import json

from IPython.display import Image, Markdown, display


async def print_streaming_response_and_citations(rag_response):
    """
    Print the streaming response and citations from the RAG response.
    """
    # Check for API errors before processing
    if rag_response.status_code != 200:
        print("Error: ", rag_response.status_code)
        return

    # Extract the streaming generator from the response
    response_generator = rag_response.generator
    first_chunk_data = None
    for chunk in response_generator:
        if chunk.startswith("data: "):
            chunk = chunk[len("data: ") :].strip()
        if not chunk:
            continue
        try:
            data = json.loads(chunk)
        except Exception as e:
            print(f"JSON decode error: {e}")
            continue
        choices = data.get("choices", [])
        if not choices:
            continue
        # Save the first chunk with citations
        if first_chunk_data is None and data.get("citations"):
            first_chunk_data = data
        # Print streaming text
        delta = choices[0].get("delta", {})
        text = delta.get("content")
        if not text:
            message = choices[0].get("message", {})
            text = message.get("content", "")
        print(text, end="", flush=True)
    print()  # Newline after streaming

    # Display citations after streaming is done
    if first_chunk_data and first_chunk_data.get("citations"):
        citations = first_chunk_data["citations"]
        for idx, citation in enumerate(citations.get("results", [])):
            doc_type = citation.get("document_type", "text")
            content = citation.get("content", "")
            doc_name = citation.get("document_name", f"Citation {idx + 1}")
            display(Markdown(f"**Citation {idx + 1}: {doc_name}**"))
            try:
                image_bytes = base64.b64decode(content)
                display(Image(data=image_bytes))
            except Exception:
                display(Markdown(f"```\n{content}\n```"))

Call the generate API

In [None]:
await print_streaming_response_and_citations(
    rag.generate(
        messages=[{"role": "user", "content": "What is the price of a hammer?"}],
        use_knowledge_base=True,
        collection_names=["test_library"], # Kindly provide collection_names argument
    )
)

**4.8. [Optional] Search for documents**

Performs a search in the vector database for relevant documents.

Define output parser

In [None]:
def print_search_citations(citations):
    """
    Display all citations from the Citations object returned by search().
    Handles base64-encoded images and text.
    """
    if not citations or not hasattr(citations, "results") or not citations.results:
        print("No citations found.")
        return

    for idx, citation in enumerate(citations.results):
        # If using pydantic models, citation fields may be attributes, not dict keys
        doc_type = getattr(citation, "document_type", "text")
        content = getattr(citation, "content", "")
        doc_name = getattr(citation, "document_name", f"Citation {idx + 1}")

        display(Markdown(f"**Citation {idx + 1}: {doc_name}**"))
        try:
            image_bytes = base64.b64decode(content)
            display(Image(data=image_bytes))
        except Exception:
            display(Markdown(f"```\n{content}\n```"))

Call the search API

In [None]:
print_search_citations(
    rag.search(
        query="What is the price of a hammer?",
        collection_names=["test_library"], # Kindly provide collection_names argument
        reranker_top_k=10,
        vdb_top_k=100,
    )
)

**4.9. [Optional] Retrieve documents summary**

You can execute this cell if summary generation was enabled during document upload using `generate_summary: bool` flag.

In [None]:
response = await rag.get_summary(
    collection_name="test_library", # Kindly provide collection_names argument
    file_name="woods_frost.docx",
    blocking=False,
    timeout=20,
)
print(response)

**Note:** Below APIs illustrate how to cleanup uploaded documents and collections once no more interaction is needed. 

**4.10. Delete documents from a collection**

Deletes documents from the specified collection.

In [None]:
response = ingestor.delete_documents(
    document_names=["../data/multimodal/multimodal_test.pdf"],
)
print(response)

**4.11. Delete collections**

Deletes the specified collection and all its documents from the vector database.

In [None]:
response = ingestor.delete_collections(
    collection_names=["test_library"]
)
print(response)