Source code for nv_ingest_client.primitives.tasks.task_base

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
from enum import Enum
from enum import auto
from typing import Dict

logger = logging.getLogger(__name__)


[docs] class TaskType(Enum): CAPTION = auto() DEDUP = auto() EMBED = auto() EXTRACT = auto() FILTER = auto() SPLIT = auto() TRANSFORM = auto() STORE_EMBEDDING = auto() STORE = auto() VDB_UPLOAD = auto() TABLE_DATA_EXTRACT = auto() CHART_DATA_EXTRACT = auto() INFOGRAPHIC_DATA_EXTRACT = auto()
[docs] def is_valid_task_type(task_type_str: str) -> bool: """ Checks if the provided string is a valid TaskType enum value. Parameters ---------- task_type_str : str The string to check against the TaskType enum values. Returns ------- bool True if the string is a valid TaskType enum value, False otherwise. """ return task_type_str in TaskType.__members__
[docs] class Task: """ Generic task Object """ def __init__(self) -> None: """ Setup Ingest Task Config """ def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += f"{self.__class__.__name__}\n" return info
[docs] def to_dict(self) -> Dict: """ Returns a string with the task specification. This string is used for constructing tasks that are then submitted to the redis client """ return {}
# class ExtractUnstructuredTask(ExtractTask): # """ # Object for document unstructured extraction task # extract_method = ["unstructured_local", "unstructured_service"] # """ # # def __init__( # self, # extract_method: ExtractTask._Type_Extract_Method, # document_type: ExtractTask._TypeDocumentType, # api_key: str, # uri: str, # ) -> None: # """ # Setup Extract Task Config # """ # super().__init__(extract_method, document_type) # self._api_key = api_key # self._uri = uri # # def __str__(self) -> str: # """ # Returns a string with the object's config and run time state # """ # info = "" # info += super().__str__() # info += f"unstructured uri: {self._uri}\n" # return info # # def to_dict(self) -> Dict: # """ # Convert to a dict for submission to redis (fixme) # """ # unstructured_properties = { # "api_key": self._api_key, # "unstructured_url": self._uri, # } # task_desc = super().to_dict() # task_desc["task_properties"]["params"].update(unstructured_properties) # return task_desc # class ExtractLlamaParseTask(ExtractTask): # """ # Object for document llama extraction task # extract_method = ["llama_parse"] # """ # # def __init__( # self, # extract_method: ExtractTask._Type_Extract_Method, # document_type: ExtractTask._TypeDocumentType, # api_key: str, # ) -> None: # """ # Setup Extract Task Config # """ # super().__init__(extract_method, document_type) # self._api_key = api_key # # def to_dict(self) -> Dict: # """ # Convert to a dict for submission to redis (fixme) # """ # llama_parse_properties = { # "api_key": self._api_key, # } # task_desc = super().to_dict() # task_desc["task_properties"]["params"].update(llama_parse_properties) # return task_desc