Source code for nv_ingest.stages.filters.image_dedup

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import functools
import hashlib
import logging
from typing import Any
from typing import Dict

import pandas as pd
from morpheus.config import Config
from morpheus.utils.module_utils import ModuleLoaderFactory
from pydantic import BaseModel

import cudf

from nv_ingest.schemas.image_dedup_schema import ImageDedupSchema
from nv_ingest.schemas.metadata_schema import ContentTypeEnum
from nv_ingest.schemas.metadata_schema import InfoMessageMetadataSchema
from nv_ingest.schemas.metadata_schema import StatusEnum
from nv_ingest.schemas.metadata_schema import TaskTypeEnum
from nv_ingest.stages.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest.util.schema.schema_validator import validate_schema
from nv_ingest_api.primitives.ingest_control_message import IngestControlMessage

logger = logging.getLogger(__name__)

MODULE_NAME = "dedup_images"
MODULE_NAMESPACE = "nv-ingest"
ImageDedupLoaderFactory = ModuleLoaderFactory(MODULE_NAME, MODULE_NAMESPACE, ImageDedupSchema)


[docs] def hash_content(x: Any, algorithm: str = "md5") -> bytes: """ Computes a hash of the content using the specified algorithm. This function takes a dictionary containing content, encodes the content as bytes, and then computes the hash using the specified algorithm. The default algorithm is `md5`. Parameters ---------- x : dict A dictionary containing the content to be hashed. The content is expected to be a string under the key `"content"`. algorithm : str, optional The hashing algorithm to use, by default "md5". Other valid algorithms can be provided, but the function currently always uses `md5`. Returns ------- bytes The hash of the content as a byte sequence. Examples -------- >>> x = {"content": "example content"} >>> hash_content(x) b'\x9a\x03\x8b\xad\x8cR\x0ea\xcd\r\x1e\xd6;+\x9c\xe0' Notes ----- This function currently supports only the `md5` algorithm. """ try: return hashlib.md5(x["content"].encode()).digest() except Exception as e: err_msg = f"hash_content: Error computing hash for content. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e
def _cpu_only_apply_dedup_filter(df: pd.DataFrame, filter_flag: bool) -> pd.DataFrame: """ Applies a deduplication filter to images in the DataFrame. This function identifies duplicate images within a DataFrame based on content hashes and either filters out duplicates or marks them as informational messages, depending on the `filter_flag`. Parameters ---------- df : pd.DataFrame The DataFrame containing the data to be filtered. It must have a `document_type` column indicating content type and a `metadata` column containing content metadata. filter_flag : bool A flag indicating whether to filter out duplicates (`True`) or mark them with informational messages (`False`). Returns ------- pd.DataFrame The DataFrame with duplicates either filtered out or marked as informational messages. Notes ----- - The function operates only on rows where `document_type` is `ContentTypeEnum.IMAGE`. - When `filter_flag` is `False`, duplicate images are marked with an informational message and the `document_type` is updated to `ContentTypeEnum.INFO_MSG`. Examples -------- >>> df = pd.DataFrame({ ... "document_type": [ContentTypeEnum.IMAGE, ContentTypeEnum.IMAGE, ContentTypeEnum.TEXT], ... "metadata": [{"content": "image1"}, {"content": "image1"}, {"content": "text"}] ... }) >>> result_df = _cpu_only_apply_dedup_filter(df, filter_flag=True) >>> result_df document_type metadata 0 IMAGE {'content': 'image1'} 2 TEXT {'content': 'text'} Raises ------ ValueError If `df` does not contain the necessary columns `document_type` and `metadata`. """ try: for col in ["document_type", "metadata"]: if col not in df.columns: raise ValueError(f"_cpu_only_apply_dedup_filter: Missing required column '{col}'.") image_mask = df["document_type"] == ContentTypeEnum.IMAGE if not image_mask.any(): return df[~image_mask] base_cols = df.columns df_images = df.loc[image_mask].copy() content_hash_sr = df_images["metadata"].apply(hash_content, args=("md5",)) df_images.loc[content_hash_sr.index, "_image_content_hash"] = content_hash_sr df_images_deduped = df_images.drop_duplicates(subset="_image_content_hash") deduped_indices = df_images_deduped.index duplicate_indices = df_images.loc[~df_images.index.isin(deduped_indices)].index if filter_flag: df_result = pd.concat( [ df_images.loc[deduped_indices][df.columns.difference(["_image_content_hash"])], df.loc[~image_mask], ], axis=0, ) return df_result duplicate_images_df = df_images.loc[duplicate_indices] info_msg = { "task": TaskTypeEnum.FILTER.value, "status": StatusEnum.SUCCESS.value, "message": "Filtered duplicate image.", "filter": True, } validated_info_msg = validate_schema(info_msg, InfoMessageMetadataSchema).model_dump() duplicate_images_df["info_message_metadata"] = [validated_info_msg] * duplicate_images_df.shape[0] df.loc[duplicate_images_df["document_type"].index, "document_type"] = ContentTypeEnum.INFO_MSG df.drop(labels=df.columns.difference(base_cols), inplace=True, axis=1) return df except Exception as e: err_msg = f"_cpu_only_apply_dedup_filter: Error applying deduplication filter. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e def _apply_dedup_filter(ctrl_msg: IngestControlMessage, filter_flag: bool) -> None: """ Applies a deduplication filter to images within a DataFrame encapsulated in a IngestControlMessage. This function identifies duplicate images based on content hashes within a DataFrame, and either filters out the duplicates or marks them as informational messages depending on the `filter_flag`. Parameters ---------- ctrl_msg : IngestControlMessage The control message containing the payload with the DataFrame to be filtered. filter_flag : bool A flag indicating whether to filter out duplicates (`True`) or mark them with informational messages (`False`). Returns ------- None The function modifies the `ctrl_msg` in place, updating the payload with the filtered or marked DataFrame. Notes ----- - The function operates only on rows where `document_type` is `ContentTypeEnum.IMAGE.value`. - When `filter_flag` is `True`, duplicates are removed from the DataFrame. - When `filter_flag` is `False`, duplicate images are marked with an informational message and the `document_type` is updated to `ContentTypeEnum.INFO_MSG.value`. - The `metadata` field in the DataFrame is exploded and restructured as needed. Examples -------- >>> ctrl_msg = IngestControlMessage(payload=some_dataframe) >>> _apply_dedup_filter(ctrl_msg, filter_flag=True) >>> filtered_df = ctrl_msg.payload().dataframe() >>> print(filtered_df) Raises ------ ValueError If the DataFrame does not contain the necessary columns `document_type` and `metadata`, or if other expected operations fail. """ try: with ctrl_msg.payload().mutable_dataframe() as mdf: image_mask = mdf["document_type"] == ContentTypeEnum.IMAGE.value if not image_mask.any(): return gdf = mdf.copy() base_cols = gdf.columns gdf_images = gdf.loc[image_mask] content_sr = gdf_images["metadata"].struct.field("content") content_hash_sr = content_sr.hash_values(method="md5", seed=None) gdf_images.loc[content_hash_sr.index, "_image_content_hash"] = content_hash_sr gdf_images_deduped = gdf_images.drop_duplicates(subset="_image_content_hash") deduped_indices = gdf_images_deduped.index duplicate_indices = gdf_images.loc[~gdf_images.index.isin(deduped_indices)].index if filter_flag: gdf_result = cudf.concat( [ gdf_images.loc[deduped_indices][gdf.columns.difference(["_image_content_hash"])], gdf.loc[~image_mask], ], axis=0, ) ctrl_msg.payload(gdf_result.to_pandas()) return gdf_temp = gdf["metadata"].struct.explode() exploded_metadata_cols = list(gdf_temp.columns) gdf[exploded_metadata_cols] = gdf_temp duplicate_images_gdf = gdf_images.loc[duplicate_indices] info_msg = { "task": TaskTypeEnum.FILTER.value, "status": StatusEnum.SUCCESS.value, "message": "Filtered duplicate image.", "filter": True, } validated_info_msg = validate_schema(info_msg, InfoMessageMetadataSchema).model_dump() duplicate_images_gdf["info_message_metadata"] = [validated_info_msg] * duplicate_images_gdf.shape[0] gdf.drop(labels=["info_message_metadata", "metadata"], inplace=True, axis=1) gdf["info_message_metadata"] = duplicate_images_gdf["info_message_metadata"] gdf.loc[duplicate_images_gdf["document_type"].index, "document_type"] = ContentTypeEnum.INFO_MSG.value gdf["metadata"] = gdf[exploded_metadata_cols + ["info_message_metadata"]].to_struct() gdf.drop(labels=gdf.columns.difference(base_cols), inplace=True, axis=1) ctrl_msg.payload(gdf.to_pandas()) return except Exception as e: err_msg = f"_apply_dedup_filter: Error applying deduplication filter to control message. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e
[docs] def dedup_image_stage(df: pd.DataFrame, task_props: Dict[str, Any], validated_config: Any) -> pd.DataFrame: """ Deduplicates images in the provided DataFrame based on the task properties. This function processes a DataFrame containing images and applies a deduplication filter based on the `filter` parameter within the task properties. The deduplication is performed by identifying and removing duplicate images, or by marking them with informational messages, depending on the value of the `filter_flag`. Parameters ---------- df : pd.DataFrame The DataFrame containing the data to be deduplicated. It must have columns that include image metadata and document types. task_props : dict of {str: Any} A dictionary containing task properties, which may include the content type and parameters for filtering. validated_config : Any The validated configuration object containing settings related to the deduplication task. Returns ------- pd.DataFrame The DataFrame with duplicates either filtered out or marked as informational messages, depending on the `filter_flag`. Notes ----- - The deduplication process operates on the rows where `document_type` is `ContentTypeEnum.IMAGE`. - The `filter_flag` parameter, extracted from `task_props`, determines whether duplicates are removed or marked. Examples -------- >>> df = pd.DataFrame({ ... "document_type": ["IMAGE", "IMAGE", "TEXT"], ... "metadata": [{"content": "image1"}, {"content": "image1"}, {"content": "text"}] ... }) >>> task_props = {"params": {"filter": True}} >>> result_df = dedup_image_stage(df, task_props, validated_config) >>> print(result_df) Raises ------ Exception If deduplication processing fails. """ try: if isinstance(task_props, BaseModel): task_props = task_props.model_dump() task_props.get("content_type") # Preserve any side effects. task_params = task_props.get("params", {}) filter_flag = task_params.get("filter", True) logger.debug(f"dedup_image_stage: De-duplicating images with filter_flag={filter_flag}") df_result = _cpu_only_apply_dedup_filter(df, filter_flag) return df_result except Exception as e: err_msg = f"dedup_image_stage: Error during deduplication. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e
[docs] def generate_dedup_stage( c: Config, dedup_config: Dict[str, Any], task: str = "dedup", task_desc: str = "dedup_images", pe_count: int = 1, ) -> MultiProcessingBaseStage: """ Generates a deduplication processing stage for images using multiprocessing. This function validates the deduplication configuration, wraps the `dedup_image_stage` function with the validated configuration, and then generates a `MultiProcessingBaseStage` for executing the deduplication task. Parameters ---------- c : Config The configuration object used to set up the multiprocessing stage. dedup_config : dict of {str: Any} A dictionary containing the deduplication configuration parameters. task : str, optional The name of the task to be performed, by default "dedup". task_desc : str, optional A description of the task, by default "dedup_images". pe_count : int, optional The number of processing elements (workers) to use for the task, by default 8. Returns ------- MultiProcessingBaseStage An instance of `MultiProcessingBaseStage` configured to perform the deduplication task. Notes ----- - The `dedup_image_stage` function is partially applied with the validated configuration, allowing it to be used within the multiprocessing framework. - The task is configured specifically for processing images, as indicated by the `filter_properties`. Examples -------- >>> c = Config() >>> dedup_config = {"filter": True} >>> stage = generate_dedup_stage(c, dedup_config) >>> stage.run() Raises ------ Exception If an error occurs during stage generation. """ try: validated_config = ImageDedupSchema(**dedup_config) _wrapped_dedup_image_stage = functools.partial(dedup_image_stage, validated_config=validated_config) logger.debug(f"generate_dedup_stage: Generating deduplication stage with config: {validated_config}") return MultiProcessingBaseStage( c=c, pe_count=pe_count, task=task, task_desc=task_desc, process_fn=_wrapped_dedup_image_stage, filter_properties={"content_type": ContentTypeEnum.IMAGE.value}, ) except Exception as e: err_msg = f"generate_dedup_stage: Error generating deduplication stage. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e