Source code for nv_ingest.framework.orchestration.morpheus.stages.mutate.image_filter
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from functools import partial
from typing import Any, Optional, List
from typing import Dict
import pandas as pd
from morpheus.config import Config
from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest.framework.orchestration.morpheus.stages.meta.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest_api.internal.mutate.filter import filter_images_internal
from nv_ingest_api.internal.schemas.transform.transform_image_filter_schema import ImageFilterSchema
logger = logging.getLogger(__name__)
[docs]
def image_filter_stage(
df_ledger: pd.DataFrame,
task_config: Dict[str, Any],
mutate_config: ImageFilterSchema,
execution_trace_log: Optional[List[Any]] = None,
) -> pd.DataFrame:
"""
Apply the image filtering stage to the ledger DataFrame.
This function extracts image filtering parameters from the provided task
configuration and delegates processing to the internal filter_images_internal
function.
Parameters
----------
df_ledger : pd.DataFrame
The ledger DataFrame containing image metadata. This DataFrame must include
the required columns for filtering.
task_config : Dict[str, Any]
A dictionary containing the task configuration. Expected to have a key "params"
holding the filtering parameters.
mutate_config : Any
Additional mutation configuration (passed directly to the internal function).
execution_trace_log : Optional[List[Any]], optional
An optional list for execution trace logging, by default None.
Returns
-------
pd.DataFrame
The resulting DataFrame after the image filtering stage has been applied.
Raises
------
Exception
Any exception raised during the filtering process is logged and re-raised with
additional context.
"""
try:
task_params: Dict[str, Any] = task_config.get("params", {})
df_result = filter_images_internal(
df_ledger=df_ledger,
task_config=task_params,
mutate_config=mutate_config,
execution_trace_log=execution_trace_log,
)
return df_result
except Exception as e:
err_msg = f"image_filter_stage: Error filtering images. Original error: {e}"
logger.error(err_msg, exc_info=True)
raise type(e)(err_msg) from e
[docs]
def generate_image_filter_stage(
c: Config,
image_filter_config: Dict[str, Any],
task: str = "filter",
task_desc: str = "image_filter",
pe_count: int = 8,
) -> MultiProcessingBaseStage:
"""
Generate an image filter stage with the specified configuration.
This function validates the image filter configuration and wraps the image_filter_stage
function to produce a multi-processing stage for filtering images.
Parameters
----------
c : Config
The global configuration object.
image_filter_config : Dict[str, Any]
A dictionary containing configuration parameters for image filtering.
task : str, optional
The task name to be assigned to the stage. Default is "filter".
task_desc : str, optional
A descriptor for latency tracing. Default is "image_filter".
pe_count : int, optional
The number of processing elements to use. Default is 8.
Returns
-------
MultiProcessingBaseStage
The generated multi-processing stage configured for image filtering.
Raises
------
Exception
Any exception raised during stage generation is logged and re-raised with additional context.
"""
try:
validated_config = ImageFilterSchema(**image_filter_config)
wrapped_filter_fn = partial(image_filter_stage, mutate_config=validated_config)
logger.debug(
f"Generating image filtering stage with {pe_count} processing elements. "
f"Task: {task}, Document Type: {ContentTypeEnum.IMAGE.value}"
)
return MultiProcessingBaseStage(
c=c,
pe_count=pe_count,
task=task,
task_desc=task_desc,
process_fn=wrapped_filter_fn,
filter_properties={"content_type": ContentTypeEnum.IMAGE.value},
)
except Exception as e:
err_msg = f"generate_image_filter_stage: Error generating image filter stage. Original error: {e}"
logger.error(err_msg, exc_info=True)
raise type(e)(err_msg) from e