stages.deduplication.shuffle_utils.stage
#
Module Contents#
Classes#
Stage that performs generic shuffling on specified columns from a FileGroupTask. This stage uses the BulkRapidsMPFShuffler with cuDF I/O for efficient GPU-based shuffling. |
API#
- class stages.deduplication.shuffle_utils.stage.ShuffleStage(
- shuffle_on: list[str],
- total_nparts: int | None = None,
- output_path: str = './',
- read_kwargs: dict[str, Any] | None = None,
- write_kwargs: dict[str, Any] | None = None,
- rmm_pool_size: int | Literal[auto] | None = 'auto',
- spill_memory_limit: int | Literal[auto] | None = 'auto',
- enable_statistics: bool = False,
Bases:
nemo_curator.stages.base.ProcessingStage
[nemo_curator.tasks.FileGroupTask
,nemo_curator.tasks.FileGroupTask
]Stage that performs generic shuffling on specified columns from a FileGroupTask. This stage uses the BulkRapidsMPFShuffler with cuDF I/O for efficient GPU-based shuffling.
Parameters
shuffle_on List of column names to shuffle on. total_nparts Total number of output partitions. If None, will be set automatically by the executor. output_path Path to write output files. read_kwargs Keyword arguments for cudf.read_parquet method. write_kwargs Keyword arguments for cudf.to_parquet method. rmm_pool_size Size of the RMM GPU memory pool in bytes. If “auto”, the memory pool is set to 90% of the free GPU memory. If None, the memory pool is set to 50% of the free GPU memory that can expand if needed. spill_memory_limit Device memory limit in bytes for spilling to host. If “auto”, the limit is set to 80% of the RMM pool size. If None spilling is disabled. enable_statistics Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
Initialization
- actor_class#
None
- extract_and_write() list[nemo_curator.tasks.FileGroupTask] #
- insert_finished() None #
- process(
- task: nemo_curator.tasks.FileGroupTask,
Not implemented for actor-based stages.
- ray_stage_spec() dict[str, Any] #
Ray stage specification for this stage.
- read_and_insert(
- task: nemo_curator.tasks.FileGroupTask,
Read files and insert into shuffler.
- teardown() None #
Teardown method called once after processing ends. Override this method to perform any cleanup.