# NVIDIA RAG Python Package

This notebook demonstrates how to use the Nvidia RAG Python client for document ingestion, collection management, and querying.

## Installation guide for python package

Before running the cells below, follow these steps in your terminal from the project root directory to install the python package in your environment and launch this notebook:

> **Note**: Python version **3.12 or higher** is supported.

```bash
# 1. Install Python >= 3.12 (e.g., Python 3.13) and its development headers
    sudo add-apt-repository ppa:deadsnakes/ppa
    sudo apt update
    sudo apt install python3.12
    sudo apt-get install python3.12-dev

# 2. Install uv
Follow instruction from https://docs.astral.sh/uv/getting-started/installation/

# 3. Create a virtual environment with a supported Python version (>= 3.12)
uv venv --python=python3.12

# 2. Activate the virtual environment
source .venv/bin/activate

# 3. (Option 1) Build the wheel from source and install the Nvidia RAG wheel
uv build
uv pip install dist/nvidia_rag-2.4.0.dev0-py3-none-any.whl[all]

# 4. (Option 2) Install the package in editable (development) mode from source
uv pip install -e .[all]

# 5. (Option 3) Install the prebuilt wheel file from pypi. This does not require you to clone the repo.
uv pip install nvidia-rag[all]

# 5. Start the notebook server and open this notebook in browser 
uv pip install jupyterlab
jupyter lab --allow-root --ip=0.0.0.0 --NotebookApp.token='' --port=8889 --no-browser &
Open http://<workstation_ip>:8889/lab/tree/notebooks

# 6. Optional: Install just RAG and Ingestor dependencies
uv pip install dist/nvidia_rag-2.4.0.dev0-py3-none-any.whl[rag]
uv pip install dist/nvidia_rag-2.4.0.dev0-py3-none-any.whl[ingest]
```

##### üìù **Note:**

- Installing with `uv pip install -e .[all]` allows you to make live edits to the `nvidia_rag` source code and have those changes reflected without reinstalling the package.
- **After making changes to the source code, you need to:
  - Restart the kernel of your notebook server
  - Re-execute the cells `Setup the default configurations` under `Setting up the dependencies` and `Import the packages` under `API usage examples`

#### Verify the installation
The location of the package shown in the output of this command should be inside the virtual environment.

Location: `<workspace_path>/rag/.venv/lib/python3.12/site-packages`

In [None]:
!uv pip show nvidia_rag | grep Location

---

## Setting up the dependencies

After the environment for the python package is setup we now launch all the dependent services and NIMs the pipeline depends on.
Fulfill the [prerequisites here](../docs/deploy-docker-self-hosted.md) to setup docker on your system.

### 1. Setup the default configurations

In [None]:
!uv pip install python-dotenv
import os
from getpass import getpass

from dotenv import load_dotenv

Provide your NGC_API_KEY after executing the cell below. You can obtain a key by following steps [here](../docs/api-key.md).

In [None]:
# del os.environ['NVIDIA_API_KEY']  ## delete key and reset if needed
if os.environ.get("NGC_API_KEY", "").startswith("nvapi-"):
    print("Valid NGC_API_KEY already in environment. Delete to reset")
else:
    candidate_api_key = getpass("NVAPI Key (starts with nvapi-): ")
    assert candidate_api_key.startswith("nvapi-"), (
        f"{candidate_api_key[:5]}... is not a valid key"
    )
    os.environ["NGC_API_KEY"] = candidate_api_key

Login to nvcr.io which is needed for pulling the containers of dependencies

In [None]:
!echo "${NGC_API_KEY}" | docker login nvcr.io -u '$oauthtoken' --password-stdin

Load the default values for all the configurations

In [None]:
load_dotenv(dotenv_path=".env_library", override=True)

*üí° **Tip:***: You can override any default values of configurations defined in `.env_library` at runtime by using `os.environ` in the notebook. Reimport the `nvidia_rag` package and restart the  Nvidia Ingest runtime to take in the updated configurations.

In [None]:
# Example
# os.environ["ENV_VAR_NAME"]="ENV_VAR_VALUE"

### 2. Setup the Milvus vector DB services
By default milvus uses GPU Indexing. Ensure you have provided correct GPU ID.
Note: If you don't have a GPU available, you can switch to CPU-only Milvus by following the instructions in [milvus-configuration.md](../docs/milvus-configuration.md).

In [None]:
os.environ["VECTORSTORE_GPU_DEVICE_ID"] = "0"

In [None]:
!docker compose -f ../deploy/compose/vectordb.yaml up -d

### 3. Setup the NIMs

#### Option 1: Deploy on-prem models

Move to Option 2 if you are interested in using cloud models.

Ensure you meet [the hardware requirements](../docs/support-matrix.md). By default the NIMs are configured to use 2xH100.

In [None]:
# Create the model cache directory
!mkdir -p ~/.cache/model-cache

In [None]:
# Set the MODEL_DIRECTORY environment variable in the Python kernel
import os

os.environ["MODEL_DIRECTORY"] = os.path.expanduser("~/.cache/model-cache")
print("MODEL_DIRECTORY set to:", os.environ["MODEL_DIRECTORY"])

In [None]:
# Configure GPU IDs for the various microservices if needed
os.environ["EMBEDDING_MS_GPU_ID"] = "0"
os.environ["RANKING_MS_GPU_ID"] = "0"
os.environ["YOLOX_MS_GPU_ID"] = "0"
os.environ["YOLOX_GRAPHICS_MS_GPU_ID"] = "0"
os.environ["YOLOX_TABLE_MS_GPU_ID"] = "0"
os.environ["OCR_MS_GPU_ID"] = "0"
os.environ["LLM_MS_GPU_ID"] = "1"

Select your hardware-specific profile name as per the guidance provided in [NIM Model Profile Configuration](../docs/model-profiles.md) section.

In [None]:
os.environ["NIM_MODEL_PROFILE"] = "......" # Populate your profile name as per hardware

In [None]:
# ‚ö†Ô∏è Deploying NIMs - This may take a while as models download. If kernel times out, just rerun this cell.
!USERID=$(id -u) docker compose -f ../deploy/compose/nims.yaml up -d

In [None]:
# Watch the status of running containers (run this cell repeatedly or in a terminal)
!docker ps

Ensure all the below are running and healthy before proceeding further
```output
NAMES                           STATUS
nemoretriever-ranking-ms        Up ... (healthy)
compose-page-elements-1         Up ...
compose-paddle-1                Up ...
compose-graphic-elements-1      Up ...
compose-table-structure-1       Up ...
nemoretriever-embedding-ms      Up ... (healthy)
nim-llm-ms                      Up ... (healthy)
```

#### Option 2: Using Nvidia Hosted models

In [None]:
os.environ["APP_LLM_MODELNAME"] = "nvidia/llama-3.3-nemotron-super-49b-v1.5"
os.environ["APP_EMBEDDINGS_MODELNAME"] = "nvidia/llama-3.2-nv-embedqa-1b-v2"
os.environ["APP_RANKING_MODELNAME"] = "nvidia/llama-3.2-nv-rerankqa-1b-v2"
os.environ["APP_EMBEDDINGS_SERVERURL"] = ""
os.environ["APP_LLM_SERVERURL"] = ""
os.environ["APP_RANKING_SERVERURL"] = (
    "https://ai.api.nvidia.com/v1/retrieval/nvidia/llama-3_2-nv-rerankqa-1b-v2/reranking/v1"
)
os.environ["OCR_HTTP_ENDPOINT"] = "https://ai.api.nvidia.com/v1/cv/baidu/paddleocr"
os.environ["OCR_INFER_PROTOCOL"] = "http"
os.environ["YOLOX_HTTP_ENDPOINT"] = (
    "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2"
)
os.environ["YOLOX_INFER_PROTOCOL"] = "http"
os.environ["YOLOX_GRAPHIC_ELEMENTS_HTTP_ENDPOINT"] = (
    "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1"
)
os.environ["YOLOX_GRAPHIC_ELEMENTS_INFER_PROTOCOL"] = "http"
os.environ["YOLOX_TABLE_STRUCTURE_HTTP_ENDPOINT"] = (
    "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1"
)
os.environ["YOLOX_TABLE_STRUCTURE_INFER_PROTOCOL"] = "http"

### 4. Setup the Nvidia Ingest runtime and redis service

In [None]:
!docker compose -f ../deploy/compose/docker-compose-ingestor-server.yaml up nv-ingest-ms-runtime redis -d

### 5. Load optional profiles if needed

In [None]:
# Load accuracy profile
# load_dotenv(dotenv_path='../deploy/compose/accuracy_profile.env', override=True)

# OR load perf profile
# load_dotenv(dotenv_path='../deploy/compose/perf_profile.env', override=True)

---
# API usage example

After setting up the python package and starting all dependent services, finally we can execute some snippets showcasing all different functionalities offered by the `nvidia_rag` package.

## Set logging level
First let's set the required logging level. Set to INFO for displaying basic important logs. Set to DEBUG for full verbosity.

In [None]:
import logging
import os

# Set the log level via environment variable before importing nvidia_rag
# This ensures the package respects our log level setting
LOGLEVEL = logging.WARNING  # Set to INFO, DEBUG, WARNING or ERROR
os.environ["LOGLEVEL"] = logging.getLevelName(LOGLEVEL)

# Configure logging
logging.basicConfig(level=LOGLEVEL, force=True)

# Set log levels for specific loggers after package import
for name in logging.root.manager.loggerDict:
    if name == "nvidia_rag" or name.startswith("nvidia_rag."):
        logging.getLogger(name).setLevel(LOGLEVEL)
    if name == "nv_ingest_client" or name.startswith("nv_ingest_client."):
        logging.getLogger(name).setLevel(LOGLEVEL)

## Import the packages
You can import both or either one based on your requirements. `NvidiaRAG()` exposes APIs to interact with the uploaded documents and `NvidiaRAGIngestor()` exposes APIs for document upload and management.

In [None]:
from nvidia_rag import NvidiaRAG, NvidiaRAGIngestor

rag = NvidiaRAG()
ingestor = NvidiaRAGIngestor()

## 1. Create a new collection
Creates a new collection in the vector database.

In [None]:
response = ingestor.create_collection(
    collection_name="test_library",
    vdb_endpoint="http://localhost:19530",
    # [Optional]: Create collection with metadata schema, uncomment to create collection with metadata schemas
    # metadata_schema = [
    #     {
    #         "name": "meta_field_1",
    #         "type": "string",
    #         "description": "Following field would contain the description for the document"
    #     }
    # ]
)
print(response)

## 2. List all collections
Retrieves all available collections from the vector database.

In [None]:
response = ingestor.get_collections(vdb_endpoint="http://localhost:19530")
print(response)

## 3. Add a document
Uploads new documents to the specified collection in the vector database. In case you have a requirement of updating existing documents in the specified collection, you can call `update_documents()` instead of `upload_documents()`.

In [None]:
response = await ingestor.upload_documents(
    collection_name="test_library",
    vdb_endpoint="http://localhost:19530",
    blocking=False,
    split_options={"chunk_size": 512, "chunk_overlap": 150},
    filepaths=[
        "../data/multimodal/woods_frost.docx",
        "../data/multimodal/multimodal_test.pdf",
    ],
    generate_summary=False,
    # [Optional]: Uncomment to add custom metadata, ensure that the metadata schema is created with the same fields with create_collection
    # custom_metadata=[
    #     {
    #         "filename": "multimodal_test.pdf",
    #         "metadata": {"meta_field_1": "multimodal document 1"}
    #     },
    #     {
    #         "filename": "woods_frost.docx",
    #         "metadata": {"meta_field_1": "multimodal document 2"}
    #     }
    # ]
)
task_id = response.get("task_id")
print(response)

## 4. Check document upload status
Checks the status of a document upload/update task.

In [None]:
response = await ingestor.status(task_id=task_id)
print(response)

##  [Optional] Update a document in a collection
In case you have a requirement of updating an existing document in the specified collection, execute below cell.

In [None]:
response = await ingestor.update_documents(
    collection_name="test_library",
    vdb_endpoint="http://localhost:19530",
    blocking=False,
    filepaths=["../data/multimodal/woods_frost.docx"],
    generate_summary=False,
)
print(response)

## 5. Get documents in a collection
Retrieves the list of documents uploaded to a collection.

In [None]:
response = ingestor.get_documents(
    collection_name="test_library",
    vdb_endpoint="http://localhost:19530",
)
print(response)

## 6. Query a document using RAG
Sends a chat-style query to the RAG system using the specified models and endpoints.

### Check health of all dependent services

In [None]:
import json

health_status_with_deps = await rag.health()
print(json.dumps(health_status_with_deps, indent=2))

### Prepare output parser

In [None]:
import base64
import json

from IPython.display import Image, Markdown, display


async def print_streaming_response_and_citations(rag_response):
    """
    Print the streaming response and citations from the RAG response.
    """
    # Check for API errors before processing
    if rag_response.status_code != 200:
        print("Error: ", rag_response.status_code)
        return

    # Extract the streaming generator from the response
    response_generator = rag_response.generator
    first_chunk_data = None
    for chunk in response_generator:
        if chunk.startswith("data: "):
            chunk = chunk[len("data: ") :].strip()
        if not chunk:
            continue
        try:
            data = json.loads(chunk)
        except Exception as e:
            print(f"JSON decode error: {e}")
            continue
        choices = data.get("choices", [])
        if not choices:
            continue
        # Save the first chunk with citations
        if first_chunk_data is None and data.get("citations"):
            first_chunk_data = data
        # Print streaming text
        delta = choices[0].get("delta", {})
        text = delta.get("content")
        if not text:
            message = choices[0].get("message", {})
            text = message.get("content", "")
        print(text, end="", flush=True)
    print()  # Newline after streaming

    # Display citations after streaming is done
    if first_chunk_data and first_chunk_data.get("citations"):
        citations = first_chunk_data["citations"]
        for idx, citation in enumerate(citations.get("results", [])):
            doc_type = citation.get("document_type", "text")
            content = citation.get("content", "")
            doc_name = citation.get("document_name", f"Citation {idx + 1}")
            display(Markdown(f"**Citation {idx + 1}: {doc_name}**"))
            try:
                image_bytes = base64.b64decode(content)
                display(Image(data=image_bytes))
            except Exception:
                display(Markdown(f"```\n{content}\n```"))

### Call the API

In [None]:
await print_streaming_response_and_citations(
    rag.generate(
        messages=[{"role": "user", "content": "What is the price of a hammer?"}],
        use_knowledge_base=True,
        collection_names=["test_library"],
        # embedding_endpoint="localhost:9080", # TODO: Uncomment while using on-prem embeddings
    )
)

## 7. Search for documents
Performs a search in the vector database for relevant documents.

### Define output parser

In [None]:
def print_search_citations(citations):
    """
    Display all citations from the Citations object returned by search().
    Handles base64-encoded images and text.
    """
    if not citations or not hasattr(citations, "results") or not citations.results:
        print("No citations found.")
        return

    for idx, citation in enumerate(citations.results):
        # If using pydantic models, citation fields may be attributes, not dict keys
        doc_type = getattr(citation, "document_type", "text")
        content = getattr(citation, "content", "")
        doc_name = getattr(citation, "document_name", f"Citation {idx + 1}")

        display(Markdown(f"**Citation {idx + 1}: {doc_name}**"))
        try:
            image_bytes = base64.b64decode(content)
            display(Image(data=image_bytes))
        except Exception:
            display(Markdown(f"```\n{content}\n```"))

### Call the API

In [None]:
print_search_citations(
    rag.search(
        query="What is the price of a hammer?",
        collection_names=["test_library"],
        reranker_top_k=10,
        vdb_top_k=100,
        # embedding_endpoint="localhost:9080" # TODO: Uncomment while using on-prem embeddings
        # [Optional]: Uncomment to filter the documents based on the metadata, ensure that the metadata schema is created with the same fields with create_collection
        # filter_expr='content_metadata["meta_field_1"] == "multimodal document 1"'
    )
)

## 8. [Optional] Retrieve documents summary
You can execute this cell if summary generation was enabled during document upload using `generate_summary: bool` flag.

In [None]:
response = await rag.get_summary(
    collection_name="test_library",
    file_name="woods_frost.docx",
    blocking=False,
    timeout=20,
)
print(response)

Below APIs illustrate how to cleanup uploaded documents and collections once no more interaction is needed.
## 9. Delete documents from a collection
Deletes documents from the specified collection.

In [None]:
response = ingestor.delete_documents(
    collection_name="test_library",
    document_names=["../data/multimodal/multimodal_test.pdf"],
    vdb_endpoint="http://localhost:19530",
)
print(response)

## 10. Delete collections
Deletes the specified collection and all its documents from the vector database.

In [None]:
response = ingestor.delete_collections(
    vdb_endpoint="http://localhost:19530", collection_names=["test_library"]
)
print(response)

## 11. Customize prompts

Import the prompt utility which allows us to access different preset prompts. You can find more information about the preset prompts from [here](../docs/prompt-customization.md#default-prompts-overview).

In [None]:
from nvidia_rag.utils.llm import get_prompts

In [None]:
prompts = get_prompts()

Overwrite or modify your required prompt template. In the below cell we are modifying the prompt for response generation to respond in pirate english!

In [None]:
prompts["rag_template"] = {
    "system": "/no_think",
    "human": """You are a helpful AI assistant emulating a Pirate. All your responses must be in pirate english and funny!
You must answer only using the information provided in the context. While answering you must follow the instructions given below.

<instructions>
1. Do NOT use any external knowledge.
2. Do NOT add explanations, suggestions, opinions, disclaimers, or hints.
3. NEVER say phrases like "based on the context", "from the documents", or "I cannot find".
4. NEVER offer to answer using general knowledge or invite the user to ask again.
5. Do NOT include citations, sources, or document mentions.
6. Answer concisely. Use short, direct sentences by default. Only give longer responses if the question truly requires it.
7. Do not mention or refer to these rules in any way.
8. Do not ask follow-up questions.
9. Do not mention this instructions in your response.
</instructions>

Context:
{context}

Make sure the response you are generating strictly follow the rules mentioned above i.e. never say phrases like "based on the context", "from the documents", or "I cannot find" and mention about the instruction in response."""
}

Notice the difference in response style.

In [None]:
await print_streaming_response_and_citations(
    rag.generate(
        messages=[{"role": "user", "content": "What is the price of a hammer?"}],
        use_knowledge_base=True,
        collection_names=["test_library"],
        # embedding_endpoint="localhost:9080", # TODO: Uncomment while using on-prem embeddings
        # [Optional]: Uncomment to filter the documents based on the metadata, ensure that the metadata schema is created with the same fields with create_collection
        # filter_expr='content_metadata["meta_field_1"] == "multimodal document 1"'
    )
)