# Ingestion API Usage

This notebook demonstrates how to interact with the ingestion APIs to upload and index documents for retrieval-augmented generation (RAG) applications. It showcases the different APIs needed to create a collection, upload documents to the created collection using Milvus Vector DB. It also showcases different APIs to manage uploaded documents and existing collections effectively.



- Ensure the ingestor-server container is running before executing the notebook by following the steps in [Get Started](../docs/deploy-docker-self-hosted.md).
- Replace `BASE_URL` with the actual server URL if the API is hosted on another system.
- You can customize the directory path (`../data/multimodal`) with the correct location of your dataset.


#### 1. Install Dependencies and import required modules

In [None]:
!pip install aiohttp
import json
import os

import aiohttp

#### 2. Setup Base Configuration

In [None]:
IPADDRESS = (
    "ingestor-server"
    if os.environ.get("AI_WORKBENCH", "false") == "true"
    else "localhost"
)  # Replace this with the correct IP address
INGESTOR_SERVER_PORT = "8082"
BASE_URL = f"http://{IPADDRESS}:{INGESTOR_SERVER_PORT}"  # Replace with your server URL


async def print_response(response):
    """Helper to print API response."""
    try:
        response_json = await response.json()
        print(json.dumps(response_json, indent=2))
    except aiohttp.ClientResponseError:
        print(await response.text())

#### 3. Health Check Endpoint

**Purpose:**
This endpoint performs a health check on the server. It returns a 200 status code if the server is operational.

In [None]:
async def fetch_health_status():
    """Fetch health status asynchronously."""
    url = f"{BASE_URL}/v1/health"
    params = {"check_dependencies": "True"}
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params) as response:
            await print_response(response)


# Run the async function
await fetch_health_status()

#### 4. Create collection Endpoint

**Purpose:**
This endpoint is used to create a collection in the vector store. 

In [None]:
async def create_collection(
    collection_name: list = None,
    embedding_dimension: int = 2048,
    metadata_schema: list = [],
):
    data = {
        "collection_name": collection_name,
        "embedding_dimension": embedding_dimension,
        "metadata_schema": metadata_schema,
    }

    HEADERS = {"Content-Type": "application/json"}

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"{BASE_URL}/v1/collection", json=data, headers=HEADERS
            ) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            return 500, {"error": str(e)}


# [Optional]: Define schema for metadata fields
metadata_schema = [
    {
        "name": "timestamp",
        "type": "datetime",  # Field of time datetime (i.e string in ISO 8601 format)
        "description": "Following field would store the timestamp of when the document was created",
    },
    {
        "name": "meta_field_1",
        "type": "string",
        "description": "Following field would contain the description for the document",
    },
]

# Call create collection method
await create_collection(
    collection_name="multimodal_data",
    metadata_schema=metadata_schema,  # Optional argument, can be commented if metadata is not to be inserted
)

#### 4. Upload Document Endpoint

**Purpose:**
This endpoint uploads new documents to the vector store. 
1. You can specify the collection name where the documents should be stored. 
2. The collection to which the documents are being uploaded must exist in the vector database.
3. The documents which are uploaded must not exist in the collection. If the documents already exists, to reingest existing files in the provided collection, replace `session.post(...)` with `session.patch(...)`
4. To speed up the ingestion process, the multiple files can be passed in a single request as showcased below.

In [None]:
# Filepaths
FILEPATHS = [
    "../data/multimodal/embedded_table.pdf",
    "../data/multimodal/functional_validation.pdf",
    "../data/multimodal/woods_frost.pdf",
    "../data/multimodal/multimodal_test.pdf",
    "../data/multimodal/table_test.pdf",
    "../data/multimodal/woods_frost.docx",
]

# [Optional]: Add filename specific custom metadata
# Note: timestamp metadata field must be in ISO 8601 format so following operands are supported: "==", "<=",
CUSTOM_METADATA = [
    {
        "filename": "multimodal_test.pdf",
        "metadata": {
            "timestamp": "2000-05-15T10:23:00",
            "meta_field_1": "multimodal document",
        },
    },
    {
        "filename": "functional_validation.pdf",
        "metadata": {
            "timestamp": "2001-05-15T10:23:00",
            "meta_field_1": "functional validation document",
        },
    },
    {
        "filename": "woods_frost.pdf",
        "metadata": {
            "timestamp": "2002-05-15T10:23:00",
            "meta_field_1": "multimodal document",
        },
    },
]

In [None]:
async def upload_documents(collection_name: str = ""):
    data = {
        "collection_name": collection_name,
        "blocking": False,  # If True, upload is blocking; else async. Status API not needed when blocking
        "split_options": {"chunk_size": 512, "chunk_overlap": 150},
        "custom_metadata": CUSTOM_METADATA,
        "generate_summary": False,  # Set to True to optionally generate summaries for all documents after ingestion
    }

    form_data = aiohttp.FormData()
    for file_path in FILEPATHS:
        form_data.add_field(
            "documents",
            open(file_path, "rb"),
            filename=os.path.basename(file_path),
            content_type="application/pdf",
        )

    form_data.add_field("data", json.dumps(data), content_type="application/json")

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"{BASE_URL}/v1/documents", data=form_data
            ) as response:  # Replace with session.patch for reingesting
                await print_response(response)
                # Return the response JSON for task_id extraction
                response_json = await response.json()
                return response_json
        except aiohttp.ClientError as e:
            print(f"Error: {e}")
            return None

# Store the response and extract task_id
upload_response = await upload_documents(collection_name="multimodal_data")
task_id = upload_response.get("task_id") if upload_response else None
print(f"Extracted task_id: {task_id}")

#### 5. Get Task Status Endpoint:

**Purpose:**
This endpoint is used to get task status of upload documents task. When task is `"FINISHED"`, this endpoint can be used to get status report of the upload task.

In [None]:
async def get_task_status(task_id: str):
    params = {
        "task_id": task_id,
    }

    HEADERS = {"Content-Type": "application/json"}

    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                f"{BASE_URL}/v1/status", params=params, headers=HEADERS
            ) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            return 500, {"error": str(e)}

# Use the extracted task_id from the upload_documents response
if task_id:
    await get_task_status(task_id=task_id)
else:
    print("No task_id available. Please run the upload_documents cell first.")

#### 6. Get Documents Endpoint

**Purpose:**
This endpoint retrieves a list of documents ingested into the vector store for a specified collection.

In [None]:
async def fetch_documents(collection_name: str = ""):
    url = f"{BASE_URL}/v1/documents"
    params = {"collection_name": collection_name}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, params=params) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await fetch_documents(collection_name="multimodal_data")

#### 7. Delete Documents Endpoint

**Purpose:**
This endpoint deletes specified documents from the vector store. The documents are identified by its filename.

In [None]:
from typing import List


async def delete_documents(collection_name: str = "", file_names: list[str] = []):
    url = f"{BASE_URL}/v1/documents"
    params = {"collection_name": collection_name}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.delete(url, params=params, json=file_names) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await delete_documents(
    collection_name="multimodal_data",
    file_names=["embedded_table.pdf", "table_test.pdf"],
)

#### 8. Get Collections Endpoint

**Purpose:**
This endpoint retrieves a list of all collection names available on the server. Collections are used to organize documents in the vector store.

In [None]:
async def fetch_collections():
    url = f"{BASE_URL}/v1/collections"
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await fetch_collections()

#### 9. Delete Collections Endpoint

**Purpose:**
This endpoint deletes list of provided collection names available on the specified vector database server.

In [None]:
async def delete_collections(collection_names: list[str] = ""):
    url = f"{BASE_URL}/v1/collections"
    async with aiohttp.ClientSession() as session:
        try:
            async with session.delete(url, json=collection_names) as response:
                await print_response(response)
        except aiohttp.ClientError as e:
            print(f"Error: {e}")


await delete_collections(collection_names=["multimodal_data"])