stages.image.deduplication.removal#

Module Contents#

Classes#

ImageDuplicatesRemovalStage

Filter stage that removes images whose IDs appear in a Parquet file.

API#

class stages.image.deduplication.removal.ImageDuplicatesRemovalStage#

Bases: nemo_curator.stages.base.ProcessingStage[nemo_curator.tasks.ImageBatch, nemo_curator.tasks.ImageBatch]

Filter stage that removes images whose IDs appear in a Parquet file.

The Parquet file must contain a column with image identifiers; by default this column is assumed to be id to match writer metadata. You can change the column name via duplicate_id_field.

Args: removal_parquets_dir: Directory containing Parquet files with image IDs to remove duplicate_id_field: Name of the column containing image IDs to remove verbose: Whether to log verbose output num_workers_per_node: Number of workers per node for the stage. This is sometimes needed to avoid OOM when concurrently running actors on one node loading the same removal parquet files into memory.

duplicate_id_field: str#

‘id’

inputs() tuple[list[str], list[str]]#

Define stage input requirements.

Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present

num_workers_per_node: int | None#

None

outputs() tuple[list[str], list[str]]#

Define stage output specification.

Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies

process(
task: nemo_curator.tasks.ImageBatch,
) nemo_curator.tasks.ImageBatch#

Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out

removal_parquets_dir: str#

None

setup(_worker_metadata=None) None#

Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)

verbose: bool#

False

xenna_stage_spec() dict[str, Any]#

Get Xenna configuration for this stage.

Returns (dict[str, Any]): Dictionary containing Xenna-specific configuration