# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Dict, Any, Optional
import pandas as pd
from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.internal.schemas.store.store_embedding_schema import EmbeddingStorageSchema
from nv_ingest_api.internal.store.embed_text_upload import store_text_embeddings_internal
from nv_ingest_api.internal.store.image_upload import store_images_to_minio_internal
from nv_ingest_api.util.exception_handlers.decorators import unified_exception_handler
[docs]
@unified_exception_handler
def store_embeddings(
    *,
    df_ledger: pd.DataFrame,
    milvus_address: Optional[str] = None,
    milvus_uri: Optional[str] = None,
    milvus_host: Optional[str] = None,
    milvus_port: Optional[int] = None,
    milvus_collection_name: Optional[str] = None,
    minio_access_key: Optional[str] = None,
    minio_secret_key: Optional[str] = None,
    minio_session_token: Optional[str] = None,
    minio_endpoint: Optional[str] = None,
    minio_bucket_name: Optional[str] = None,
    minio_bucket_path: Optional[str] = None,
    minio_secure: Optional[bool] = None,
    minio_region: Optional[str] = None,
) -> pd.DataFrame:
    """
    Stores embeddings by configuring task parameters and invoking the internal storage routine.
    If any of the connection or configuration parameters are None, they will be omitted from the task
    configuration, allowing default values defined in the storage schema to be used.
    Parameters
    ----------
    df_ledger : pd.DataFrame
        DataFrame containing the data whose embeddings need to be stored.
    milvus_address : Optional[str], default=None
        The address of the Milvus service.
    milvus_uri : Optional[str], default=None
        The URI for the Milvus service.
    milvus_host : Optional[str], default=None
        The host for the Milvus service.
    milvus_port : Optional[int], default=None
        The port for the Milvus service.
    milvus_collection_name : Optional[str], default=None
        The name of the Milvus collection.
    minio_access_key : Optional[str], default=None
        The access key for MinIO.
    minio_secret_key : Optional[str], default=None
        The secret key for MinIO.
    minio_session_token : Optional[str], default=None
        The session token for MinIO.
    minio_endpoint : Optional[str], default=None
        The endpoint URL for MinIO.
    minio_bucket_name : Optional[str], default=None
        The name of the MinIO bucket.
    minio_bucket_path : Optional[str], default=None
        The bucket path where embeddings will be stored.
    minio_secure : Optional[bool], default=None
        Whether to use a secure connection to MinIO.
    minio_region : Optional[str], default=None
        The region of the MinIO service.
    Returns
    -------
    pd.DataFrame
        The updated DataFrame after embeddings have been stored.
    Raises
    ------
    Exception
        Propagates any exception raised during the storage process, wrapped with additional context.
    """
    params: Dict[str, Any] = {
        "milvus_address": milvus_address,
        "milvus_collection_name": milvus_collection_name,
        "milvus_host": milvus_host,
        "milvus_port": milvus_port,
        "milvus_uri": milvus_uri,
        "minio_access_key": minio_access_key,
        "minio_bucket_name": minio_bucket_name,
        "minio_bucket_path": minio_bucket_path,
        "minio_endpoint": minio_endpoint,
        "minio_region": minio_region,
        "minio_secret_key": minio_secret_key,
        "minio_secure": minio_secure,
        "minio_session_token": minio_session_token,
    }
    # Remove keys with None values so that default values in the storage schema are used.
    filtered_params = {key: value for key, value in params.items() if value is not None}
    task_config: Dict[str, Any] = {"params": filtered_params}
    store_config = EmbeddingStorageSchema()
    result, _ = store_text_embeddings_internal(
        df_ledger,
        task_config=task_config,
        store_config=store_config,
        execution_trace_log=None,
    )
    return result 
[docs]
@unified_exception_handler
def store_images_to_minio(
    *,
    df_ledger: pd.DataFrame,
    store_structured: bool = True,
    store_unstructured: bool = False,
    minio_access_key: Optional[str] = None,
    minio_bucket_name: Optional[str] = None,
    minio_endpoint: Optional[str] = None,
    minio_region: Optional[str] = None,
    minio_secret_key: Optional[str] = None,
    minio_secure: bool = False,
    minio_session_token: Optional[str] = None,
) -> pd.DataFrame:
    """
    Store images to a Minio storage backend.
    This function prepares a flat configuration dictionary for storing images and structured
    data to a Minio storage system. It determines which content types to store based on the
    provided flags and delegates the storage operation to the internal function
    `store_images_to_minio_internal`.
    Parameters
    ----------
    df_ledger : pd.DataFrame
        DataFrame containing ledger information with document metadata.
    store_structured : bool, optional
        Flag indicating whether to store structured content. Defaults to True.
    store_unstructured : bool, optional
        Flag indicating whether to store unstructured image content. Defaults to False.
    minio_access_key : Optional[str], optional
        Access key for authenticating with Minio. Defaults to None.
    minio_bucket_name : Optional[str], optional
        Name of the Minio bucket where images will be stored. Defaults to None.
    minio_endpoint : Optional[str], optional
        Endpoint URL for the Minio service. Defaults to None.
    minio_region : Optional[str], optional
        Region identifier for the Minio service. Defaults to None.
    minio_secret_key : Optional[str], optional
        Secret key for authenticating with Minio. Defaults to None.
    minio_secure : bool, optional
        Whether to use a secure connection (HTTPS) with Minio. Defaults to False.
    minio_session_token : Optional[str], optional
        Session token for temporary credentials with Minio. Defaults to None.
    Returns
    -------
    pd.DataFrame
        The updated DataFrame after uploading images if matching objects were found;
        otherwise, the original DataFrame is returned.
    Raises
    ------
    Exception
        Any exceptions raised during the image storage process will be handled by the
        `unified_exception_handler` decorator.
    See Also
    --------
    store_images_to_minio_internal : Internal function that performs the actual image storage.
    _upload_images_to_minio : Function that uploads images to MinIO and updates the ledger metadata.
    Examples
    --------
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     'document_type': ['IMAGE'],
    ...     'metadata': [{
    ...         'source_metadata': {'source_id': '123'},
    ...         'image_metadata': {'image_type': 'png'},
    ...         'content': 'base64_encoded_content'
    ...     }]
    ... })
    >>> result = store_images_to_minio(
    ...     df_ledger=df,
    ...     minio_access_key='ACCESS_KEY',
    ...     minio_secret_key='SECRET_KEY',
    ...     minio_bucket_name='mybucket'
    ... )
    """
    content_types = {
        ContentTypeEnum.STRUCTURED: store_structured,
        ContentTypeEnum.IMAGE: store_unstructured,
    }
    # Build the task configuration as a flat dictionary, matching the internal function's expectations.
    task_config = {
        "access_key": minio_access_key,
        "bucket_name": minio_bucket_name,
        "content_types": content_types,
        "endpoint": minio_endpoint,
        "region": minio_region,
        "secret_key": minio_secret_key,
        "secure": minio_secure,
        "session_token": minio_session_token,
    }
    storage_config = {}
    result, _ = store_images_to_minio_internal(
        df_storage_ledger=df_ledger,
        task_config=task_config,
        storage_config=storage_config,
        execution_trace_log=None,
    )
    return result