stages.deduplication.fuzzy.lsh.stage#

Module Contents#

Classes#

LSHStage

Stage that performs LSH on a FileGroupTask containing minhash data.

API#

class stages.deduplication.fuzzy.lsh.stage.LSHStage#

Bases: nemo_curator.stages.base.ProcessingStage[nemo_curator.tasks.FileGroupTask, nemo_curator.tasks.FileGroupTask]

Stage that performs LSH on a FileGroupTask containing minhash data.

The executor will process this stage in iterations based on bands_per_iteration.

Parameters

num_bands Number of LSH bands. minhashes_per_band Number of minhashes per band. id_field Name of the ID field in input data. minhash_field Name of the minhash field in input data. output_path Base path to write output files. read_kwargs Keyword arguments for the read method. write_kwargs Keyword arguments for the write method. rmm_pool_size Size of the RMM GPU memory pool in bytes. If “auto”, the memory pool is set to 90% of the free GPU memory. If None, the memory pool is set to 50% of the free GPU memory that can expand if needed. spill_memory_limit Device memory limit in bytes for spilling to host. If “auto”, the limit is set to 80% of the RMM pool size. If None spilling is disabled. enable_statistics Whether to collect statistics. bands_per_iteration Number of bands to process per shuffle iteration. Between 1 and num_bands. Higher values reduce the number of shuffle iterations but increase the memory usage. total_nparts Total number of partitions to write during the shuffle. If None, the number of partitions will be decided automatically by the executor as the closest power of 2 <= number of input tasks.

actor_class#

None

bands_per_iteration: int#

5

enable_statistics: bool#

False

extract_and_write() list[nemo_curator.tasks.FileGroupTask]#
get_band_iterations() collections.abc.Iterator[tuple[int, int]]#

Get all band ranges for iteration.

id_field: str#

None

insert_finished() None#
minhash_field: str#

None

minhashes_per_band: int#

None

num_bands: int#

None

output_path: str#

‘./’

process(
task: nemo_curator.tasks.FileGroupTask,
) nemo_curator.tasks.FileGroupTask#

Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out

ray_stage_spec() dict[str, Any]#

Ray stage specification for this stage.

read_and_insert(
task: nemo_curator.tasks.FileGroupTask,
band_range: tuple[int, int],
) nemo_curator.tasks.FileGroupTask#
read_kwargs: dict[str, Any] | None#

None

rmm_pool_size: int | Literal[auto] | None#

‘auto’

spill_memory_limit: int | Literal[auto] | None#

‘auto’

teardown() None#

Teardown method called once after processing ends. Override this method to perform any cleanup.

total_nparts: int | None#

None

write_kwargs: dict[str, Any] | None#

None