nemo_curator.stages.deduplication.fuzzy.lsh.stage

View as Markdown

Module Contents

Classes

NameDescription
LSHStageStage that performs LSH on a FileGroupTask containing minhash data.

API

class nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage(
num_bands: int,
minhashes_per_band: int,
id_field: str = CURATOR_DEDUP_ID_STR,
minhash_field: str = CURATOR_DEFAULT_MINHASH_FIELD,
output_path: str = './',
read_kwargs: dict[str, typing.Any] | None = None,
write_kwargs: dict[str, typing.Any] | None = None,
rmm_pool_size: int | typing.Literal['auto'] | None = 'auto',
spill_memory_limit: int | typing.Literal['auto'] | None = 'auto',
enable_statistics: bool = False,
bands_per_iteration: int = 5,
total_nparts: int | None = None
)
Dataclass

Bases: ProcessingStage[FileGroupTask, FileGroupTask]

Stage that performs LSH on a FileGroupTask containing minhash data.

The executor will process this stage in iterations based on bands_per_iteration.

Parameters

num_bands Number of LSH bands. minhashes_per_band Number of minhashes per band. id_field Name of the ID field in input data. minhash_field Name of the minhash field in input data. output_path Base path to write output files. read_kwargs Keyword arguments for the read method. write_kwargs Keyword arguments for the write method. rmm_pool_size Size of the RMM GPU memory pool in bytes. If “auto”, the memory pool is set to 90% of the free GPU memory. If None, the memory pool is set to 50% of the free GPU memory that can expand if needed. spill_memory_limit Device memory limit in bytes for spilling to host. If “auto”, the limit is set to 80% of the RMM pool size. If None spilling is disabled. enable_statistics Whether to collect statistics. bands_per_iteration Number of bands to process per shuffle iteration. Between 1 and num_bands. Higher values reduce the number of shuffle iterations but increase the memory usage. total_nparts Total number of partitions to write during the shuffle. If None, the number of partitions will be decided automatically by the executor as the closest power of 2 <= number of input tasks.

bands_per_iteration
int = 5
enable_statistics
bool = False
id_field
str = CURATOR_DEDUP_ID_STR
minhash_field
str = CURATOR_DEFAULT_MINHASH_FIELD
minhashes_per_band
int
name
= 'LSHStage'
num_bands
int
output_path
str = './'
read_kwargs
dict[str, Any] | None = None
resources
= Resources(gpus=1.0)
rmm_pool_size
int | Literal['auto'] | None = 'auto'
spill_memory_limit
int | Literal['auto'] | None = 'auto'
total_nparts
int | None = None
write_kwargs
dict[str, Any] | None = None
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.__post_init__()
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage._check_actor_obj() -> None
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.extract_and_write() -> list[nemo_curator.tasks.FileGroupTask]
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.get_band_iterations() -> collections.abc.Iterator[tuple[int, int]]

Get all band ranges for iteration.

nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.insert_finished() -> None
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.process(
task: nemo_curator.tasks.FileGroupTask
) -> nemo_curator.tasks.FileGroupTask
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.ray_stage_spec() -> dict[str, typing.Any]

Ray stage specification for this stage.

nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.read_and_insert(
task: nemo_curator.tasks.FileGroupTask,
band_range: tuple[int, int]
) -> nemo_curator.tasks.FileGroupTask
nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage.teardown() -> None