Source code for nv_ingest_client.primitives.tasks.dedup
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments
import logging
from typing import Dict
from typing import Literal
from pydantic import BaseModel, field_validator
from .task_base import Task
logger = logging.getLogger(__name__)
[docs]
class DedupTaskSchema(BaseModel):
    content_type: str = "image"
    filter: bool = False
[docs]
    @field_validator("content_type")
    def content_type_must_be_valid(cls, v):
        valid_criteria = ["image"]
        if v not in valid_criteria:
            raise ValueError(f"content_type must be one of {valid_criteria}")
        return v 
[docs]
    class Config:
        extra = "forbid" 
 
[docs]
class DedupTask(Task):
    """
    Object for document dedup task
    """
    _TypeContentType = Literal["image"]
    def __init__(
        self,
        content_type: _TypeContentType = "image",
        filter: bool = False,
    ) -> None:
        """
        Setup Dedup Task Config
        """
        super().__init__()
        self._content_type = content_type
        self._filter = filter
    def __str__(self) -> str:
        """
        Returns a string with the object's config and run time state
        """
        info = ""
        info += "Dedup Task:\n"
        info += f"  content_type: {self._content_type}\n"
        info += f"  filter: {self._filter}\n"
        return info
[docs]
    def to_dict(self) -> Dict:
        """
        Convert to a dict for submission to redis
        """
        dedup_params = {"filter": self._filter}
        task_properties = {
            "content_type": self._content_type,
            "params": dedup_params,
        }
        return {"type": "dedup", "task_properties": task_properties}