stages.deduplication.fuzzy.lsh.stage
#
Module Contents#
Classes#
Stage that performs LSH on a FileGroupTask containing minhash data. |
API#
- class stages.deduplication.fuzzy.lsh.stage.LSHStage#
Bases:
nemo_curator.stages.base.ProcessingStage
[nemo_curator.tasks.FileGroupTask
,nemo_curator.tasks.FileGroupTask
]Stage that performs LSH on a FileGroupTask containing minhash data.
The executor will process this stage in iterations based on bands_per_iteration.
Parameters
num_bands Number of LSH bands. minhashes_per_band Number of minhashes per band. id_field Name of the ID field in input data. minhash_field Name of the minhash field in input data. output_path Base path to write output files. read_kwargs Keyword arguments for the read method. write_kwargs Keyword arguments for the write method. rmm_pool_size Size of the RMM GPU memory pool in bytes. If “auto”, the memory pool is set to 90% of the free GPU memory. If None, the memory pool is set to 50% of the free GPU memory that can expand if needed. spill_memory_limit Device memory limit in bytes for spilling to host. If “auto”, the limit is set to 80% of the RMM pool size. If None spilling is disabled. enable_statistics Whether to collect statistics. bands_per_iteration Number of bands to process per shuffle iteration. Between 1 and num_bands. Higher values reduce the number of shuffle iterations but increase the memory usage. total_nparts Total number of partitions to write during the shuffle. If None, the number of partitions will be decided automatically by the executor as the closest power of 2 <= number of input tasks.
- actor_class#
None
- bands_per_iteration: int#
5
- enable_statistics: bool#
False
- extract_and_write() list[nemo_curator.tasks.FileGroupTask] #
- get_band_iterations() collections.abc.Iterator[tuple[int, int]] #
Get all band ranges for iteration.
- id_field: str#
None
- insert_finished() None #
- minhash_field: str#
None
- minhashes_per_band: int#
None
- num_bands: int#
None
- output_path: str#
‘./’
- process(
- task: nemo_curator.tasks.FileGroupTask,
Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out
- ray_stage_spec() dict[str, Any] #
Ray stage specification for this stage.
- read_and_insert(
- task: nemo_curator.tasks.FileGroupTask,
- band_range: tuple[int, int],
- read_kwargs: dict[str, Any] | None#
None
- rmm_pool_size: int | Literal[auto] | None#
‘auto’
- spill_memory_limit: int | Literal[auto] | None#
‘auto’
- teardown() None #
Teardown method called once after processing ends. Override this method to perform any cleanup.
- total_nparts: int | None#
None
- write_kwargs: dict[str, Any] | None#
None