stages.deduplication.semantic.pairwise_io#

Module Contents#

Classes#

ClusterWiseFilePartitioningStage

Stage that partitions input files into PairwiseFileGroupTasks for deduplication.

API#

class stages.deduplication.semantic.pairwise_io.ClusterWiseFilePartitioningStage(
input_path: str,
storage_options: dict[str, Any] | None = None,
)#

Bases: nemo_curator.stages.base.ProcessingStage[nemo_curator.tasks._EmptyTask, nemo_curator.tasks.FileGroupTask]

Stage that partitions input files into PairwiseFileGroupTasks for deduplication.

This stage takes an EmptyTask as input and outputs partition-aware file groups. It reads parquet files partitioned by centroid (from kmeans output) and creates one PairwiseFileGroupTask per centroid partition.

Initialization

Initialize the partitioning stage.

Args: input_path: Path to the kmeans output directory containing centroid partitions storage_options: Storage options for reading files limit: Maximum number of partitions to process

inputs() tuple[list[str], list[str]]#

Define stage input requirements.

Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present

outputs() tuple[list[str], list[str]]#

Define stage output specification.

Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies

process(
_: nemo_curator.tasks._EmptyTask,
) list[nemo_curator.tasks.FileGroupTask]#

Process the EmptyTask to create PairwiseFileGroupTasks.

Args: task: EmptyTask input (ignored, used for triggering the stage)

Returns: List of PairwiseFileGroupTask, each containing partitioned file groups per centroid

ray_stage_spec() dict[str, Any]#

Ray stage specification for this stage.

setup(
_: nemo_curator.backends.base.WorkerMetadata | None = None,
) None#

Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)