stages.deduplication.semantic.pairwise_io
#
Module Contents#
Classes#
Stage that partitions input files into PairwiseFileGroupTasks for deduplication. |
API#
- class stages.deduplication.semantic.pairwise_io.ClusterWiseFilePartitioningStage(
- input_path: str,
- storage_options: dict[str, Any] | None = None,
Bases:
nemo_curator.stages.base.ProcessingStage
[nemo_curator.tasks._EmptyTask
,nemo_curator.tasks.FileGroupTask
]Stage that partitions input files into PairwiseFileGroupTasks for deduplication.
This stage takes an EmptyTask as input and outputs partition-aware file groups. It reads parquet files partitioned by centroid (from kmeans output) and creates one PairwiseFileGroupTask per centroid partition.
Initialization
Initialize the partitioning stage.
Args: input_path: Path to the kmeans output directory containing centroid partitions storage_options: Storage options for reading files limit: Maximum number of partitions to process
- inputs() tuple[list[str], list[str]] #
Define stage input requirements.
Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present
- outputs() tuple[list[str], list[str]] #
Define stage output specification.
Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies
- process(
- _: nemo_curator.tasks._EmptyTask,
Process the EmptyTask to create PairwiseFileGroupTasks.
Args: task: EmptyTask input (ignored, used for triggering the stage)
Returns: List of PairwiseFileGroupTask, each containing partitioned file groups per centroid
- ray_stage_spec() dict[str, Any] #
Ray stage specification for this stage.
- setup(
- _: nemo_curator.backends.base.WorkerMetadata | None = None,
Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)