nv_ingest.framework.orchestration.morpheus.stages.mutate package#
Submodules#
nv_ingest.framework.orchestration.morpheus.stages.mutate.image_dedup module#
- nv_ingest.framework.orchestration.morpheus.stages.mutate.image_dedup.dedup_image_stage(
- df_ledger: DataFrame,
- task_config: Dict[str, Any],
- mutate_config: ImageDedupSchema,
Deduplicates images in the provided DataFrame based on the task properties.
This function processes a DataFrame containing images and applies a deduplication filter based on the filter parameter within the task properties. The deduplication is performed by identifying and removing duplicate images, or by marking them with informational messages, depending on the value of the filter_flag.
- Parameters:
df_ledger (pd.DataFrame) – The DataFrame containing the data to be deduplicated. It must have columns that include image metadata and document types.
task_config (dict of {str: Any}) – A dictionary containing task properties, which may include the content type and parameters for filtering.
mutate_config (Any) – The validated configuration object containing settings related to the deduplication task.
- Returns:
The DataFrame with duplicates either filtered out or marked as informational messages, depending on the filter_flag.
- Return type:
pd.DataFrame
Notes
The deduplication process operates on the rows where document_type is ContentTypeEnum.IMAGE.
The filter_flag parameter, extracted from task_props, determines whether duplicates are removed or marked.
Examples
>>> df = pd.DataFrame({ ... "document_type": ["IMAGE", "IMAGE", "TEXT"], ... "metadata": [{"content": "image1"}, {"content": "image1"}, {"content": "text"}] ... }) >>> task_props = {"params": {"filter": True}} >>> result_df = dedup_image_stage(df_ledger, task_config, mutate_config) >>> print(result_df)
- Raises:
Exception – If deduplication processing fails.
- nv_ingest.framework.orchestration.morpheus.stages.mutate.image_dedup.generate_dedup_stage(
- c: Config,
- deduplicate_image_config: Dict[str, Any],
- task: str = 'dedup',
- task_desc: str = 'dedup_images',
- pe_count: int = 8,
Generates a deduplication processing stage for images using multiprocessing.
This function validates the deduplication configuration, wraps the dedup_image_stage function with the validated configuration, and then generates a MultiProcessingBaseStage for executing the deduplication task.
- Parameters:
c (Config) – The configuration object used to set up the multiprocessing stage.
deduplicate_image_config (dict of {str: Any}) – A dictionary containing the deduplication configuration parameters.
task (str, optional) – The name of the task to be performed, by default “dedup”.
task_desc (str, optional) – A description of the task, by default “dedup_images”.
pe_count (int, optional) – The number of processing elements (workers) to use for the task, by default 8.
- Returns:
An instance of MultiProcessingBaseStage configured to perform the deduplication task.
- Return type:
Notes
The dedup_image_stage function is partially applied with the validated configuration, allowing it to be used within the multiprocessing framework.
The task is configured specifically for processing images, as indicated by the filter_properties.
Examples
>>> c = Config() >>> dedup_config = {"filter": True} >>> stage = generate_dedup_stage(c, deduplicate_image_config) >>> stage.run()
- Raises:
Exception – If an error occurs during stage generation.
nv_ingest.framework.orchestration.morpheus.stages.mutate.image_filter module#
- nv_ingest.framework.orchestration.morpheus.stages.mutate.image_filter.generate_image_filter_stage(
- c: Config,
- image_filter_config: Dict[str, Any],
- task: str = 'filter',
- task_desc: str = 'image_filter',
- pe_count: int = 8,
Generate an image filter stage with the specified configuration.
This function validates the image filter configuration and wraps the image_filter_stage function to produce a multi-processing stage for filtering images.
- Parameters:
c (Config) – The global configuration object.
image_filter_config (Dict[str, Any]) – A dictionary containing configuration parameters for image filtering.
task (str, optional) – The task name to be assigned to the stage. Default is “filter”.
task_desc (str, optional) – A descriptor for latency tracing. Default is “image_filter”.
pe_count (int, optional) – The number of processing elements to use. Default is 8.
- Returns:
The generated multi-processing stage configured for image filtering.
- Return type:
- Raises:
Exception – Any exception raised during stage generation is logged and re-raised with additional context.
- nv_ingest.framework.orchestration.morpheus.stages.mutate.image_filter.image_filter_stage(
- df_ledger: DataFrame,
- task_config: Dict[str, Any],
- mutate_config: ImageFilterSchema,
- execution_trace_log: List[Any] | None = None,
Apply the image filtering stage to the ledger DataFrame.
This function extracts image filtering parameters from the provided task configuration and delegates processing to the internal filter_images_internal function.
- Parameters:
df_ledger (pd.DataFrame) – The ledger DataFrame containing image metadata. This DataFrame must include the required columns for filtering.
task_config (Dict[str, Any]) – A dictionary containing the task configuration. Expected to have a key “params” holding the filtering parameters.
mutate_config (Any) – Additional mutation configuration (passed directly to the internal function).
execution_trace_log (Optional[List[Any]], optional) – An optional list for execution trace logging, by default None.
- Returns:
The resulting DataFrame after the image filtering stage has been applied.
- Return type:
pd.DataFrame
- Raises:
Exception – Any exception raised during the filtering process is logged and re-raised with additional context.
Module contents#
- nv_ingest.framework.orchestration.morpheus.stages.mutate.generate_dedup_stage(
- c: Config,
- deduplicate_image_config: Dict[str, Any],
- task: str = 'dedup',
- task_desc: str = 'dedup_images',
- pe_count: int = 8,
Generates a deduplication processing stage for images using multiprocessing.
This function validates the deduplication configuration, wraps the dedup_image_stage function with the validated configuration, and then generates a MultiProcessingBaseStage for executing the deduplication task.
- Parameters:
c (Config) – The configuration object used to set up the multiprocessing stage.
deduplicate_image_config (dict of {str: Any}) – A dictionary containing the deduplication configuration parameters.
task (str, optional) – The name of the task to be performed, by default “dedup”.
task_desc (str, optional) – A description of the task, by default “dedup_images”.
pe_count (int, optional) – The number of processing elements (workers) to use for the task, by default 8.
- Returns:
An instance of MultiProcessingBaseStage configured to perform the deduplication task.
- Return type:
Notes
The dedup_image_stage function is partially applied with the validated configuration, allowing it to be used within the multiprocessing framework.
The task is configured specifically for processing images, as indicated by the filter_properties.
Examples
>>> c = Config() >>> dedup_config = {"filter": True} >>> stage = generate_dedup_stage(c, deduplicate_image_config) >>> stage.run()
- Raises:
Exception – If an error occurs during stage generation.
- nv_ingest.framework.orchestration.morpheus.stages.mutate.generate_image_filter_stage(
- c: Config,
- image_filter_config: Dict[str, Any],
- task: str = 'filter',
- task_desc: str = 'image_filter',
- pe_count: int = 8,
Generate an image filter stage with the specified configuration.
This function validates the image filter configuration and wraps the image_filter_stage function to produce a multi-processing stage for filtering images.
- Parameters:
c (Config) – The global configuration object.
image_filter_config (Dict[str, Any]) – A dictionary containing configuration parameters for image filtering.
task (str, optional) – The task name to be assigned to the stage. Default is “filter”.
task_desc (str, optional) – A descriptor for latency tracing. Default is “image_filter”.
pe_count (int, optional) – The number of processing elements to use. Default is 8.
- Returns:
The generated multi-processing stage configured for image filtering.
- Return type:
- Raises:
Exception – Any exception raised during stage generation is logged and re-raised with additional context.