Source code for nv_ingest.framework.orchestration.morpheus.stages.mutate.image_dedup

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import functools
import logging
from typing import Any
from typing import Dict

import pandas as pd
from morpheus.config import Config
from morpheus.utils.module_utils import ModuleLoaderFactory

from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest.framework.orchestration.morpheus.stages.meta.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest_api.internal.mutate.deduplicate import deduplicate_images_internal
from nv_ingest_api.internal.schemas.mutate.mutate_image_dedup_schema import ImageDedupSchema

logger = logging.getLogger(__name__)

MODULE_NAME = "dedup_images"
MODULE_NAMESPACE = "nv-ingest"
ImageDedupLoaderFactory = ModuleLoaderFactory(MODULE_NAME, MODULE_NAMESPACE, ImageDedupSchema)


[docs] def dedup_image_stage( df_ledger: pd.DataFrame, task_config: Dict[str, Any], mutate_config: ImageDedupSchema ) -> pd.DataFrame: """ Deduplicates images in the provided DataFrame based on the task properties. This function processes a DataFrame containing images and applies a deduplication filter based on the `filter` parameter within the task properties. The deduplication is performed by identifying and removing duplicate images, or by marking them with informational messages, depending on the value of the `filter_flag`. Parameters ---------- df_ledger : pd.DataFrame The DataFrame containing the data to be deduplicated. It must have columns that include image metadata and document types. task_config : dict of {str: Any} A dictionary containing task properties, which may include the content type and parameters for filtering. mutate_config : Any The validated configuration object containing settings related to the deduplication task. Returns ------- pd.DataFrame The DataFrame with duplicates either filtered out or marked as informational messages, depending on the `filter_flag`. Notes ----- - The deduplication process operates on the rows where `document_type` is `ContentTypeEnum.IMAGE`. - The `filter_flag` parameter, extracted from `task_props`, determines whether duplicates are removed or marked. Examples -------- >>> df = pd.DataFrame({ ... "document_type": ["IMAGE", "IMAGE", "TEXT"], ... "metadata": [{"content": "image1"}, {"content": "image1"}, {"content": "text"}] ... }) >>> task_props = {"params": {"filter": True}} >>> result_df = dedup_image_stage(df_ledger, task_config, mutate_config) >>> print(result_df) Raises ------ Exception If deduplication processing fails. """ try: # TODO(Devin): Make hash algo configurable task_config = {"hash_algorithm": "md5"} df_result = deduplicate_images_internal( df_ledger=df_ledger, task_config=task_config, mutate_config=mutate_config, execution_trace_log=None ) return df_result except Exception as e: err_msg = f"dedup_image_stage: Error during deduplication. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e
[docs] def generate_dedup_stage( c: Config, deduplicate_image_config: Dict[str, Any], task: str = "dedup", task_desc: str = "dedup_images", pe_count: int = 8, ) -> MultiProcessingBaseStage: """ Generates a deduplication processing stage for images using multiprocessing. This function validates the deduplication configuration, wraps the `dedup_image_stage` function with the validated configuration, and then generates a `MultiProcessingBaseStage` for executing the deduplication task. Parameters ---------- c : Config The configuration object used to set up the multiprocessing stage. deduplicate_image_config : dict of {str: Any} A dictionary containing the deduplication configuration parameters. task : str, optional The name of the task to be performed, by default "dedup". task_desc : str, optional A description of the task, by default "dedup_images". pe_count : int, optional The number of processing elements (workers) to use for the task, by default 8. Returns ------- MultiProcessingBaseStage An instance of `MultiProcessingBaseStage` configured to perform the deduplication task. Notes ----- - The `dedup_image_stage` function is partially applied with the validated configuration, allowing it to be used within the multiprocessing framework. - The task is configured specifically for processing images, as indicated by the `filter_properties`. Examples -------- >>> c = Config() >>> dedup_config = {"filter": True} >>> stage = generate_dedup_stage(c, deduplicate_image_config) >>> stage.run() Raises ------ Exception If an error occurs during stage generation. """ try: validated_config = ImageDedupSchema(**deduplicate_image_config) _wrapped_dedup_image_stage = functools.partial(dedup_image_stage, mutate_config=validated_config) logger.debug(f"generate_dedup_stage: Generating deduplication stage with config: {validated_config}") return MultiProcessingBaseStage( c=c, pe_count=pe_count, task=task, task_desc=task_desc, process_fn=_wrapped_dedup_image_stage, filter_properties={"content_type": ContentTypeEnum.IMAGE.value}, ) except Exception as e: err_msg = f"generate_dedup_stage: Error generating deduplication stage. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e