Source code for nv_ingest_api.interface.store

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from typing import Dict, Any, Optional

import pandas as pd

from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.internal.schemas.store.store_embedding_schema import EmbeddingStorageSchema
from nv_ingest_api.internal.store.embed_text_upload import store_text_embeddings_internal
from nv_ingest_api.internal.store.image_upload import store_images_to_minio_internal
from nv_ingest_api.util.exception_handlers.decorators import unified_exception_handler


[docs] @unified_exception_handler def store_embeddings( *, df_ledger: pd.DataFrame, milvus_address: Optional[str] = None, milvus_uri: Optional[str] = None, milvus_host: Optional[str] = None, milvus_port: Optional[int] = None, milvus_collection_name: Optional[str] = None, minio_access_key: Optional[str] = None, minio_secret_key: Optional[str] = None, minio_session_token: Optional[str] = None, minio_endpoint: Optional[str] = None, minio_bucket_name: Optional[str] = None, minio_bucket_path: Optional[str] = None, minio_secure: Optional[bool] = None, minio_region: Optional[str] = None, ) -> pd.DataFrame: """ Stores embeddings by configuring task parameters and invoking the internal storage routine. If any of the connection or configuration parameters are None, they will be omitted from the task configuration, allowing default values defined in the storage schema to be used. Parameters ---------- df_ledger : pd.DataFrame DataFrame containing the data whose embeddings need to be stored. milvus_address : Optional[str], default=None The address of the Milvus service. milvus_uri : Optional[str], default=None The URI for the Milvus service. milvus_host : Optional[str], default=None The host for the Milvus service. milvus_port : Optional[int], default=None The port for the Milvus service. milvus_collection_name : Optional[str], default=None The name of the Milvus collection. minio_access_key : Optional[str], default=None The access key for MinIO. minio_secret_key : Optional[str], default=None The secret key for MinIO. minio_session_token : Optional[str], default=None The session token for MinIO. minio_endpoint : Optional[str], default=None The endpoint URL for MinIO. minio_bucket_name : Optional[str], default=None The name of the MinIO bucket. minio_bucket_path : Optional[str], default=None The bucket path where embeddings will be stored. minio_secure : Optional[bool], default=None Whether to use a secure connection to MinIO. minio_region : Optional[str], default=None The region of the MinIO service. Returns ------- pd.DataFrame The updated DataFrame after embeddings have been stored. Raises ------ Exception Propagates any exception raised during the storage process, wrapped with additional context. """ params: Dict[str, Any] = { "milvus_address": milvus_address, "milvus_collection_name": milvus_collection_name, "milvus_host": milvus_host, "milvus_port": milvus_port, "milvus_uri": milvus_uri, "minio_access_key": minio_access_key, "minio_bucket_name": minio_bucket_name, "minio_bucket_path": minio_bucket_path, "minio_endpoint": minio_endpoint, "minio_region": minio_region, "minio_secret_key": minio_secret_key, "minio_secure": minio_secure, "minio_session_token": minio_session_token, } # Remove keys with None values so that default values in the storage schema are used. filtered_params = {key: value for key, value in params.items() if value is not None} task_config: Dict[str, Any] = {"params": filtered_params} store_config = EmbeddingStorageSchema() result, _ = store_text_embeddings_internal( df_ledger, task_config=task_config, store_config=store_config, execution_trace_log=None, ) return result
[docs] @unified_exception_handler def store_images_to_minio( *, df_ledger: pd.DataFrame, store_structured: bool = True, store_unstructured: bool = False, minio_access_key: Optional[str] = None, minio_bucket_name: Optional[str] = None, minio_endpoint: Optional[str] = None, minio_region: Optional[str] = None, minio_secret_key: Optional[str] = None, minio_secure: bool = False, minio_session_token: Optional[str] = None, ) -> pd.DataFrame: """ Store images to a Minio storage backend. This function prepares a flat configuration dictionary for storing images and structured data to a Minio storage system. It determines which content types to store based on the provided flags and delegates the storage operation to the internal function `store_images_to_minio_internal`. Parameters ---------- df_ledger : pd.DataFrame DataFrame containing ledger information with document metadata. store_structured : bool, optional Flag indicating whether to store structured content. Defaults to True. store_unstructured : bool, optional Flag indicating whether to store unstructured image content. Defaults to False. minio_access_key : Optional[str], optional Access key for authenticating with Minio. Defaults to None. minio_bucket_name : Optional[str], optional Name of the Minio bucket where images will be stored. Defaults to None. minio_endpoint : Optional[str], optional Endpoint URL for the Minio service. Defaults to None. minio_region : Optional[str], optional Region identifier for the Minio service. Defaults to None. minio_secret_key : Optional[str], optional Secret key for authenticating with Minio. Defaults to None. minio_secure : bool, optional Whether to use a secure connection (HTTPS) with Minio. Defaults to False. minio_session_token : Optional[str], optional Session token for temporary credentials with Minio. Defaults to None. Returns ------- pd.DataFrame The updated DataFrame after uploading images if matching objects were found; otherwise, the original DataFrame is returned. Raises ------ Exception Any exceptions raised during the image storage process will be handled by the `unified_exception_handler` decorator. See Also -------- store_images_to_minio_internal : Internal function that performs the actual image storage. _upload_images_to_minio : Function that uploads images to MinIO and updates the ledger metadata. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame({ ... 'document_type': ['IMAGE'], ... 'metadata': [{ ... 'source_metadata': {'source_id': '123'}, ... 'image_metadata': {'image_type': 'png'}, ... 'content': 'base64_encoded_content' ... }] ... }) >>> result = store_images_to_minio( ... df_ledger=df, ... minio_access_key='ACCESS_KEY', ... minio_secret_key='SECRET_KEY', ... minio_bucket_name='mybucket' ... ) """ content_types = { ContentTypeEnum.STRUCTURED: store_structured, ContentTypeEnum.IMAGE: store_unstructured, } # Build the task configuration as a flat dictionary, matching the internal function's expectations. task_config = { "access_key": minio_access_key, "bucket_name": minio_bucket_name, "content_types": content_types, "endpoint": minio_endpoint, "region": minio_region, "secret_key": minio_secret_key, "secure": minio_secure, "session_token": minio_session_token, } storage_config = {} result, _ = store_images_to_minio_internal( df_storage_ledger=df_ledger, task_config=task_config, storage_config=storage_config, execution_trace_log=None, ) return result