Source code for nv_ingest_client.primitives.tasks.store

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
from typing import Dict
from typing import Literal

from pydantic import BaseModel
from pydantic import model_validator

from .task_base import Task

logger = logging.getLogger(__name__)

_DEFAULT_STORE_METHOD = "minio"


[docs] class StoreEmbedTaskSchema(BaseModel):
[docs] class Config: extra = "allow"
[docs] class StoreTaskSchema(BaseModel): store_method: str = None
[docs] @model_validator(mode="before") @classmethod def set_default_store_method(cls, values): store_method = values.get("store_method") if store_method is None: values["store_method"] = _DEFAULT_STORE_METHOD return values
[docs] class Config: extra = "allow"
[docs] class StoreTask(Task): """ Object for image storage task. """ _Type_Content_Type = Literal["image",] _Type_Store_Method = Literal["minio",] def __init__( self, structured: bool = True, images: bool = False, store_method: _Type_Store_Method = None, params: dict = None, **extra_params, ) -> None: """ Setup Store Task Config """ super().__init__() self._structured = structured self._images = images self._store_method = store_method or "minio" self._params = params self._extra_params = extra_params def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += "Store Task:\n" info += f" store structured types: {self._structured}\n" info += f" store image types: {self._images}\n" info += f" store method: {self._store_method}\n" for key, value in self._extra_params.items(): info += f" {key}: {value}\n" for key, value in self._params.items(): info += f" {key}: {value}\n" return info
[docs] def to_dict(self) -> Dict: """ Convert to a dict for submission to redis (fixme) """ task_properties = { "method": self._store_method, "structured": self._structured, "images": self._images, "params": self._params, **self._extra_params, } return {"type": "store", "task_properties": task_properties}
[docs] class StoreEmbedTask(Task): """ Object for image storage task. """ _Type_Content_Type = Literal["embedding",] _Type_Store_Method = Literal["minio",] def __init__(self, params: dict = None, **extra_params) -> None: """ Setup Store Task Config """ super().__init__() self._params = params or {} self._extra_params = extra_params def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += "Store Embed Task:\n" for key, value in self._extra_params.items(): info += f" {key}: {value}\n" for key, value in self._params.items(): info += f" {key}: {value}\n" return info
[docs] def to_dict(self) -> Dict: """ Convert to a dict for submission to redis (fixme) """ task_properties = { "params": self._params, **self._extra_params, } return {"type": "store_embedding", "task_properties": task_properties}