Source code for nv_ingest_api.internal.mutate.deduplicate
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import hashlib
from typing import Any, Dict, Optional, List
import pandas as pd
from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.internal.schemas.mutate.mutate_image_dedup_schema import ImageDedupSchema
logger = logging.getLogger(__name__)
def _hash_content(x: Any, algorithm: str = "md5") -> bytes:
"""
Compute a hash of the content using the specified algorithm.
Parameters
----------
x : dict
A dictionary containing the content under the key "content".
algorithm : str, optional
Hashing algorithm to use (default "md5").
Returns
-------
bytes
The computed hash.
"""
try:
return hashlib.new(algorithm, x["content"].encode()).digest()
except Exception as e:
msg = f"hash_content: Error computing hash: {e}"
logger.error(msg, exc_info=True)
raise type(e)(msg) from e
[docs]
def deduplicate_images_internal(
df_ledger: pd.DataFrame,
task_config: Dict[str, Any],
mutate_config: ImageDedupSchema = ImageDedupSchema(),
execution_trace_log: Optional[List[Any]] = None,
) -> pd.DataFrame:
"""
Deduplicate images in a DataFrame based on content hashes.
The function processes rows where the 'document_type' is IMAGE, computes a content hash for each,
and then either removes duplicates or marks them based on the 'filter' flag in task_config.
A 'hash_algorithm' flag in task_config determines the algorithm used for hashing.
Parameters
----------
df_ledger : pd.DataFrame
DataFrame containing at least 'document_type' and 'metadata' columns.
task_config : dict
Configuration parameters, including:
- "filter": bool, if True duplicate rows are removed; if False, duplicates are marked.
- "hash_algorithm": str, the algorithm to use for hashing (default "md5").
mutate_config : ImageDedupSchema, optional
execution_trace_log : Optional[List[Any]], optional
Returns
-------
pd.DataFrame
The DataFrame with duplicate images either removed or marked.
Raises
------
ValueError
If the required columns are missing.
Exception
For any other errors encountered during deduplication.
"""
_ = mutate_config # Unused variable
_ = execution_trace_log # TODO(Devin): Implement trace logging
try:
# Verify required columns exist.
for col in ("document_type", "metadata"):
if col not in df_ledger.columns:
raise ValueError(f"Missing required column '{col}'.")
# Select image rows.
image_mask = df_ledger["document_type"] == ContentTypeEnum.IMAGE
if not image_mask.any():
return df_ledger[~image_mask]
df_images = df_ledger.loc[image_mask].copy()
hash_algorithm = task_config.get("hash_algorithm", "md5")
# Compute content hash for each image.
df_images["_image_content_hash"] = df_images["metadata"].apply(_hash_content, args=(hash_algorithm,))
df_images_deduped = df_images.drop_duplicates(subset="_image_content_hash")
deduped_indices = df_images_deduped.index
non_image_rows = df_ledger.loc[~image_mask]
deduped_images = df_images.loc[deduped_indices][df_ledger.columns.difference(["_image_content_hash"])]
result, execution_trace_log = pd.concat([deduped_images, non_image_rows], axis=0), {}
_ = execution_trace_log
return result
except Exception as e:
msg = f"deduplicate_images_internal: Error applying deduplication filter: {e}"
logger.error(msg, exc_info=True)
raise type(e)(msg) from e