# Evaluating RAG Pipeline: Answer Accuracy, Context Relevancy, and Groundedness via RAGAS

In this notebook, we will evaluate our RAG system using three key metrics with the [Ragas](https://docs.ragas.io/en/stable/) library. 

Ragas provides a set of evaluation metrics that can be used to measure the performance of your LLM application. These metrics are designed to help you objectively measure the performance of your application. 
## Evaluation Metrics

In this notebook, we will use the following three metrics, introduced to Ragas by NVIDIA:
1. **Answer Accuracy**: Measures the agreement between a model’s response and a reference ground truth for a given question.
2. **Context Relevancy**: Evaluates whether the retrieved contexts (chunks or passages) are pertinent to the user input. 
3. **Response Groundedness**: Measures how well a response is supported or "grounded" by the retrieved contexts. It assesses whether each claim in the response can be found, either wholly or partially, in the provided contexts.

## Prerequisites

This notebook assumes you are familiar with the RAG system and you have both `rag-server` and `ingestor-server` up and running. If you have not done that, you can refer to [Get Started](../docs/deploy-docker-self-hosted.md) to start the RAG server.

## 1. Download Evaluation Documents

First, let's download the FinanceBench dataset to evaluate our RAG system. This dataset includes PDF files with information and reports about publicly traded companies, as well as ground truth question and answer pairs.

We'll clone the repository into our data directory in a subdirectory called `financebench`. The PDFs can be found in the `pdfs` subdirectory.


In [None]:
! git clone https://github.com/patronus-ai/financebench.git ../data/financebench

## 2. Ingest Evaluation Documents

For evaluation, we will use the FinanceBench dataset. In the data directory, we have the PDF files for the FinanceBench dataset, as well as the `financebench_open_source.jsonl` file, which includes ground truth question and answer pairs. 

Let's start by creating a collection called `financebench` and upload the relevant documents.

This process is similar to the `ingestion_api_usage` notebook. First, we'll install the required packages and set up our API connections.

In [None]:
# Installing required Python packages
! pip install aiohttp langchain-nvidia-ai-endpoints ragas httpx

In [None]:
import aiohttp
import os
import json
import glob
import httpx

In [None]:
IPADDRESS = "ingestor-server" if os.environ.get("AI_WORKBENCH", "false") == "true" else "localhost" # Replace this with the correct IP address
INGESTOR_SERVER_PORT = "8082"
INGESTOR_BASE_URL = f"http://{IPADDRESS}:{INGESTOR_SERVER_PORT}"  # Replace with your server URL

async def print_response(response):
    """Helper to print API response."""
    try:
        response_json = await response.json()
        print(json.dumps(response_json, indent=2))
    except aiohttp.ClientResponseError:
        print(await response.text())


In [None]:
async def create_collection(
    collection_name: str = None,
    embedding_dimension: int = 2048,
    metadata_schema: list = []
):
    """Create a new collection in the vector database."""
    data = {
        "collection_name": collection_name,
        "embedding_dimension": embedding_dimension,
        "metadata_schema": metadata_schema
    }

    HEADERS = {"Content-Type": "application/json"}

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(f"{INGESTOR_BASE_URL}/v1/collection", json=data, headers=HEADERS) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            return 500, {"error": str(e)}

# Create the financebench collection
await create_collection(
    collection_name="financebench",
)

In [None]:
# Get all PDF files from the financebench directory
FILEPATHS = glob.glob(os.path.join("../data/financebench/pdfs", "*.pdf"))

async def upload_documents(collection_name: str = ""):
    """Upload documents to the specified collection."""
    data = {
        "collection_name": collection_name,
        "blocking": False,  # If True, upload is blocking; else async. Status API not needed when blocking
        "split_options": {
            "chunk_size": 512,
            "chunk_overlap": 150
        },
        "generate_summary": False  # Set to True to optionally generate summaries for all documents after ingestion
    }

    form_data = aiohttp.FormData()
    
    # Add all PDF files to the form data
    for file_path in FILEPATHS:
        form_data.add_field("documents", open(file_path, "rb"), filename=os.path.basename(file_path), content_type="application/pdf")

    form_data.add_field("data", json.dumps(data), content_type="application/json")

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(f"{INGESTOR_BASE_URL}/v1/documents", data=form_data) as response: # Replace with session.patch for reingesting
                await print_response(response)
                # Return the response JSON for task_id extraction
                response_json = await response.json()
                return response_json
        except aiohttp.ClientError as e:
            print(f"Error uploading documents: {e}")
            return None

# Store the response and extract task_id
upload_response = await upload_documents(collection_name="financebench")
task_id = upload_response.get("task_id") if upload_response else None
print(f"Extracted task_id: {task_id}")


**⚠️ Note**: During the document ingestion process, two files (`INTEL_2023_8K_dated-2023-08-16.pdf` and `INTEL_2023_8K_dated-2023-02-10.pdf`) may fail to process due to formatting issues. This is expected and can be safely ignored, as it will not affect the evaluation methodology or results. The remaining documents in the dataset are sufficient for comprehensive evaluation.

In [None]:
# This might take a few minutes to complete depending on the number of documents uploaded
async def get_task_status(
    task_id: str
):

    params = {
        "task_id": task_id,
    }

    HEADERS = {"Content-Type": "application/json"}

    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(f"{INGESTOR_BASE_URL}/v1/status", params=params, headers=HEADERS) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            return 500, {"error": str(e)}

# Use the extracted task_id from the upload_documents response
if task_id:
    await get_task_status(task_id=task_id)
else:
    print("No task_id available. Please run the upload_documents cell first.")

## 3. Create Dataset for Ragas Evaluation

In `data/financebench/data`, there is a file called `financebench_open_source.jsonl`. This file contains questions about the PDFs, as well as corresponding ground truth answers.

For each ground-truth question and answer pair, we will:
1. Generate an answer from our RAG system
2. Retrieve the relevant document contexts
3. Create a dataset suitable for Ragas evaluation

The answer and context retrieval from the RAG system is similar to the `retriever_api_usage` notebook.


In [None]:
IPADDRESS = "rag-server" if os.environ.get("AI_WORKBENCH", "false") == "true" else "localhost" #Replace this with the correct IP address
RAG_SERVER_PORT = "8081"
RAG_BASE_URL = f"http://{IPADDRESS}:{RAG_SERVER_PORT}"  # Replace with your server URL

generate_url = f"{RAG_BASE_URL}/v1/generate"

async def generate_answer(payload):
    """Generate an answer using the RAG server."""
    rag_response = ""
    citations = []
    is_first_token = True

    async with httpx.AsyncClient(timeout=300.0) as client:
        try:
            async with client.stream("POST", url=generate_url, json=payload) as response:
                # Raise an exception for bad status codes like 4xx or 5xx
                response.raise_for_status()

                # iterate over the response lines
                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        json_str = line[6:].strip()
                        if not json_str:
                            continue

                        try:
                            data = json.loads(json_str)

                            # --- Extract the response from the RAG server ---
                            message = data.get("choices", [{}])[0].get("message", {}).get("content", "")
                            if message:
                                rag_response += message

                            # --- Extract the citations from the RAG server ---
                            if is_first_token and data.get("citations"):
                                for result in data.get("citations", {}).get("results", []):
                                    description = result.get("metadata", {}).get("description")
                                    if description:
                                        citations.append(description)
                                is_first_token = False

                            finish_reason = data.get("choices", [{}])[0].get("finish_reason")
                            if finish_reason == "stop":
                                return rag_response, citations

                        except json.JSONDecodeError:
                            print(f"Skipping malformed JSON line: {json_str}")
                            continue
        
        except httpx.HTTPStatusError as e:
            print(f"HTTP error occurred: {e.response.status_code} - {e.response.text}")
        except httpx.RequestError as e:
            print(f"An error occurred while requesting {e.request.url!r}: {e}")
        except Exception as e:
            print(f"An error occurred: {e}")

    return rag_response, citations


In [None]:
# Load the question and ground-truth answer pairs from the FinanceBench dataset
with open('../data/financebench/data/financebench_open_source.jsonl', 'r') as file:
    gt_qa_pairs = [json.loads(line) for line in file]

print(f"Loaded {len(gt_qa_pairs)} question-answer pairs from FinanceBench dataset")

dataset = []

# For the purposes of keeping this demo brief, we will only evaluate on 50 questions. 
# You can increase this to the full dataset for more comprehensive results.
n = 50 
print(f"Evaluating on {n} questions...")

for idx, qa_pair in enumerate(gt_qa_pairs[:n]):
    question = qa_pair['question']
    
    print(f"Processing question {idx + 1}/{n}: {question[:100]}...")

    generate_payload = {
        "messages": [
            {
                "role": "user",
                "content": question
            }
        ],
        "use_knowledge_base": True,
        "reranker_top_k": 2,
        "vdb_top_k": 10,
        "vdb_endpoint": "http://milvus:19530",
        "collection_names": ["financebench"],
        "enable_reranker": True,
        "enable_citations": True,
        "stop": [],
        "filter_expr": ''
    }
    
    rag_answer, citations = await generate_answer(generate_payload)

    dataset.append({
        "user_input": question,
        "retrieved_contexts": citations,
        "response": rag_answer,
        "reference": qa_pair['answer'],
    })

print(f"Created dataset with {len(dataset)} entries for evaluation")


## 4. Evaluate with Ragas

In this example, we will use the NVIDIA hosted endpoint for our judge model. To use this endpoint, please provide your NVIDIA API Key below. 

### Rate Limiting Considerations

When using the public endpoint for the Judge LLM, you will likely encounter rate limit errors. We can try to reduce the number of errors by adjusting the configuration, which we do below. 

Alternatively, you can use self-hosted NIM Microservices endpoints to avoid these errors altogether. If you're using a self-hosted NIM, you do not need to provide your API Key.

### Getting Your NVIDIA API Key

To generate an API Key:
1. Go to [build.nvidia.com](https://build.nvidia.com/)
2. Click the green "Get API Key" button in the top right corner
3. Paste your key below to save it as an environment variable

### Self-Hosted Option

To deploy the Judge LLM as a NIM on your own infrastructure, follow the instructions [here](https://build.nvidia.com/mistralai/mixtral-8x22b-instruct/deploy).


In [None]:
import os
from getpass import getpass
# del os.environ['NVIDIA_API_KEY']  ## delete key and reset if needed
if os.environ.get("NVIDIA_API_KEY", "").startswith("nvapi-"):
    print("Valid NVIDIA_API_KEY already in environment. Delete to reset")
else:
    candidate_api_key = getpass("NVAPI Key (starts with nvapi-): ")
    assert candidate_api_key.startswith("nvapi-"), (
        f"{candidate_api_key[:5]}... is not a valid key"
    )
    os.environ["NVIDIA_API_KEY"] = candidate_api_key

In [None]:

# Note: Models on build.nvidia.com are rate limited.
# To avoid rate-limit issues, either deploy the judge model locally (self-hosted NIM)
# or use any OpenAI-compatible LLM as the judge for evaluation.
from langchain_nvidia_ai_endpoints.chat_models import ChatNVIDIA

# Initialize the judge LLM for evaluation
# You can use any other model by creating Chat Model object
llm = ChatNVIDIA(model="openai/gpt-oss-120b")

In [None]:
# Create the evaluation dataset from our collected data
from ragas import EvaluationDataset

evaluation_dataset = EvaluationDataset.from_list(dataset)
print(f"Created evaluation dataset with {len(evaluation_dataset)} samples")

In [None]:
# Import the required metrics and evaluation components
from ragas.metrics import AnswerAccuracy, ContextRelevance, ResponseGroundedness
from ragas import evaluate
from ragas.llms import LangchainLLMWrapper

# Wrap the LLM for use with Ragas
evaluator_llm = LangchainLLMWrapper(llm)

In [None]:
from ragas.run_config import RunConfig

custom_config = RunConfig(max_workers=1, max_wait=120)

In [None]:
# Run the evaluation with our three metrics
print("Starting Ragas evaluation...")
print("This may take several minutes depending on the dataset size.")

results = evaluate(
    dataset=evaluation_dataset,
    metrics=[AnswerAccuracy(), ContextRelevance(), ResponseGroundedness()],
    llm=evaluator_llm, 
    run_config=custom_config
)

print("Evaluation completed!")


## 5. Analyze Results

Finally, let's examine our evaluation results. We'll look at both the overall metrics and individual sample performance.

In [None]:
results

In [None]:
# Convert results to pandas DataFrame for detailed analysis of individual queries
results_df = results.to_pandas()

import pandas as pd

# 1. Set the option to display ALL columns, preventing the '...'
pd.set_option('display.max_columns', None)

# 2. To prevent long text in cells from being cut off, you can set the column width
pd.set_option('display.max_colwidth', 80)

results_df.head()