Source code for nv_ingest_api.internal.mutate.deduplicate

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging
import hashlib
from typing import Any, Dict, Optional, List

import pandas as pd

from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.internal.schemas.mutate.mutate_image_dedup_schema import ImageDedupSchema

logger = logging.getLogger(__name__)


def _hash_content(x: Any, algorithm: str = "md5") -> bytes:
    """
    Compute a hash of the content using the specified algorithm.

    Parameters
    ----------
    x : dict
        A dictionary containing the content under the key "content".
    algorithm : str, optional
        Hashing algorithm to use (default "md5").

    Returns
    -------
    bytes
        The computed hash.
    """
    try:
        return hashlib.new(algorithm, x["content"].encode()).digest()
    except Exception as e:
        msg = f"hash_content: Error computing hash: {e}"
        logger.error(msg, exc_info=True)
        raise type(e)(msg) from e


[docs] def deduplicate_images_internal( df_ledger: pd.DataFrame, task_config: Dict[str, Any], mutate_config: ImageDedupSchema = ImageDedupSchema(), execution_trace_log: Optional[List[Any]] = None, ) -> pd.DataFrame: """ Deduplicate images in a DataFrame based on content hashes. The function processes rows where the 'document_type' is IMAGE, computes a content hash for each, and then either removes duplicates or marks them based on the 'filter' flag in task_config. A 'hash_algorithm' flag in task_config determines the algorithm used for hashing. Parameters ---------- df_ledger : pd.DataFrame DataFrame containing at least 'document_type' and 'metadata' columns. task_config : dict Configuration parameters, including: - "filter": bool, if True duplicate rows are removed; if False, duplicates are marked. - "hash_algorithm": str, the algorithm to use for hashing (default "md5"). mutate_config : ImageDedupSchema, optional execution_trace_log : Optional[List[Any]], optional Returns ------- pd.DataFrame The DataFrame with duplicate images either removed or marked. Raises ------ ValueError If the required columns are missing. Exception For any other errors encountered during deduplication. """ _ = mutate_config # Unused variable _ = execution_trace_log # TODO(Devin): Implement trace logging try: # Verify required columns exist. for col in ("document_type", "metadata"): if col not in df_ledger.columns: raise ValueError(f"Missing required column '{col}'.") # Select image rows. image_mask = df_ledger["document_type"] == ContentTypeEnum.IMAGE if not image_mask.any(): return df_ledger[~image_mask] df_images = df_ledger.loc[image_mask].copy() hash_algorithm = task_config.get("hash_algorithm", "md5") # Compute content hash for each image. df_images["_image_content_hash"] = df_images["metadata"].apply(_hash_content, args=(hash_algorithm,)) df_images_deduped = df_images.drop_duplicates(subset="_image_content_hash") deduped_indices = df_images_deduped.index non_image_rows = df_ledger.loc[~image_mask] deduped_images = df_images.loc[deduped_indices][df_ledger.columns.difference(["_image_content_hash"])] result, execution_trace_log = pd.concat([deduped_images, non_image_rows], axis=0), {} _ = execution_trace_log return result except Exception as e: msg = f"deduplicate_images_internal: Error applying deduplication filter: {e}" logger.error(msg, exc_info=True) raise type(e)(msg) from e