Source code for nv_ingest_client.primitives.tasks.dedup

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
from typing import Dict
from typing import Literal

from pydantic import BaseModel, field_validator


from .task_base import Task

logger = logging.getLogger(__name__)


[docs] class DedupTaskSchema(BaseModel): content_type: str = "image" filter: bool = False
[docs] @field_validator("content_type") def content_type_must_be_valid(cls, v): valid_criteria = ["image"] if v not in valid_criteria: raise ValueError(f"content_type must be one of {valid_criteria}") return v
[docs] class Config: extra = "forbid"
[docs] class DedupTask(Task): """ Object for document dedup task """ _TypeContentType = Literal["image"] def __init__( self, content_type: _TypeContentType = "image", filter: bool = False, ) -> None: """ Setup Dedup Task Config """ super().__init__() self._content_type = content_type self._filter = filter def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += "Dedup Task:\n" info += f" content_type: {self._content_type}\n" info += f" filter: {self._filter}\n" return info
[docs] def to_dict(self) -> Dict: """ Convert to a dict for submission to redis """ dedup_params = {"filter": self._filter} task_properties = { "content_type": self._content_type, "params": dedup_params, } return {"type": "dedup", "task_properties": task_properties}