Source code for nv_ingest_api.internal.mutate.filter

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Dict, Optional, List, Any

import pandas as pd

from nv_ingest_api.internal.enums.common import TaskTypeEnum
from nv_ingest_api.internal.schemas.meta.metadata_schema import (
    ContentTypeEnum,
    InfoMessageMetadataSchema,
    StatusEnum,
)
from nv_ingest_api.internal.schemas.transform.transform_image_filter_schema import ImageFilterSchema
from nv_ingest_api.util.schema.schema_validator import validate_schema

logger = logging.getLogger(__name__)


def _add_info_message(x, info_msg):
    x["info_message_metadata"] = info_msg

    return x


def _calculate_average_image_size(x):
    return (x["image_metadata"]["width"] + x["image_metadata"]["height"]) / 2


def _calculate_aspect_ratio(x):
    return x["image_metadata"]["width"] / max(x["image_metadata"]["height"], 1e-9)


[docs] def filter_images_internal( df_ledger: pd.DataFrame, task_config: Dict[str, Any], mutate_config: ImageFilterSchema = ImageFilterSchema(), execution_trace_log: Optional[List[Any]] = None, ) -> pd.DataFrame: """ Apply an image filtering operation to a DataFrame based on average image size and aspect ratio. Parameters ---------- df_ledger : pd.DataFrame DataFrame to be filtered. Must contain 'document_type' and 'metadata' columns. task_config : dict Dictionary with the following keys: - "min_size": Minimum average image size threshold. - "max_aspect_ratio": Maximum allowed aspect ratio. - "min_aspect_ratio": Minimum allowed aspect ratio. - "filter": If True, rows failing the criteria are dropped; if False, they are flagged. mutate_config : ImageFilterSchema execution_trace_log : Optional[List[Any]], optional Returns ------- pd.DataFrame The updated DataFrame after applying the image filter. Raises ------ ValueError If required columns are missing or if parameters are invalid. Exception For other errors encountered during filtering. """ _ = mutate_config # Unused variable _ = execution_trace_log # TODO(Devin) try: required_columns = {"document_type", "metadata"} if not required_columns.issubset(df_ledger.columns): raise ValueError(f"DataFrame must contain columns: {required_columns}") min_size = task_config.get("min_size") max_aspect_ratio = task_config.get("max_aspect_ratio") min_aspect_ratio = task_config.get("min_aspect_ratio") filter_flag = task_config.get("filter", True) if not isinstance(min_size, (int, float)) or min_size < 0: raise ValueError("min_size must be a non-negative number") if not isinstance(max_aspect_ratio, (int, float)) or max_aspect_ratio <= 0: raise ValueError("max_aspect_ratio must be a positive number") if not isinstance(min_aspect_ratio, (int, float)) or min_aspect_ratio <= 0: raise ValueError("min_aspect_ratio must be a positive number") if min_aspect_ratio > max_aspect_ratio: raise ValueError("min_aspect_ratio cannot be greater than max_aspect_ratio") image_mask = df_ledger["document_type"] == ContentTypeEnum.IMAGE if not image_mask.any(): return df_ledger.copy() df_image = df_ledger.loc[image_mask].copy() avg_size = df_image["metadata"].apply(_calculate_average_image_size) avg_size_mask = avg_size > min_size aspect_ratio = df_image["metadata"].apply(_calculate_aspect_ratio) min_aspect_ratio_mask = aspect_ratio > min_aspect_ratio max_aspect_ratio_mask = aspect_ratio < max_aspect_ratio valid_mask = avg_size_mask & min_aspect_ratio_mask & max_aspect_ratio_mask image_filter_mask = ~valid_mask if image_filter_mask.any(): filtered_df = df_image.loc[image_filter_mask].copy() if filter_flag: df_ledger.drop(labels=filtered_df.index, inplace=True) return df_ledger info_msg = { "task": TaskTypeEnum.FILTER.value, "status": StatusEnum.SUCCESS.value, "message": "Filtered due to image size or aspect ratio.", "filter": True, } validated_info_msg = validate_schema(info_msg, InfoMessageMetadataSchema).model_dump() filtered_df["info_message_metadata"] = [validated_info_msg] * filtered_df.shape[0] filtered_df["metadata"] = filtered_df["metadata"].apply(_add_info_message, args=(info_msg,)) df_ledger.loc[filtered_df.index, "metadata"] = filtered_df["metadata"] df_ledger.loc[filtered_df.index, "document_type"] = ContentTypeEnum.INFO_MSG result, execution_trace_log = df_ledger, {} return result except Exception as e: err_msg = f"filter_images_internal: Error applying image filter. Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e