stages.deduplication.exact.identification#

Module Contents#

Classes#

ExactDuplicateIdentification

Stage that finds exact duplicates in a given column.

Data#

API#

stages.deduplication.exact.identification.EXACT_DUPLICATE_GROUP_FIELD#

‘_exact_duplicate_group’

class stages.deduplication.exact.identification.ExactDuplicateIdentification(
text_field: str,
output_path: str,
input_filetype: Literal[jsonl, parquet] = 'parquet',
read_kwargs: dict[str, Any] | None = None,
write_kwargs: dict[str, Any] | None = None,
assign_id: bool = True,
id_field: str | None = None,
total_nparts: int | None = None,
rmm_pool_size: int | Literal[auto] | None = 'auto',
spill_memory_limit: int | Literal[auto] | None = 'auto',
enable_statistics: bool = False,
)#

Bases: nemo_curator.stages.deduplication.io_utils.DeduplicationIO, nemo_curator.stages.deduplication.shuffle_utils.stage.ShuffleStage

Stage that finds exact duplicates in a given column.

Parameters

text_field Field name representing the field to find duplicates in. output_path Path to write output files. input_filetype Type of the input files. Must be one of “jsonl” or “parquet”. Default is “parquet”. read_kwargs Keyword arguments for cudf.read_parquet method. write_kwargs Keyword arguments for cudf.to_parquet method. assign_id Whether to assign a unique id to each document. id_field Existing id field name if not assigning a new id. total_nparts Total number of output partitions. If None, will be set automatically by the executor. rmm_pool_size Size of the RMM GPU memory pool in bytes. If “auto”, the memory pool is set to 90% of the free GPU memory. If None, the memory pool is set to 50% of the free GPU memory that can expand if needed. spill_memory_limit Device memory limit in bytes for spilling to host. If “auto”, the limit is set to 80% of the RMM pool size. If None spilling is disabled. enable_statistics Whether the underlying rapidsmpf shuffler should collect shuffle statistics.

Initialization

extract_and_write() list[nemo_curator.tasks.FileGroupTask]#
inputs() tuple[list[str], list[str]]#

Define stage input requirements.

Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present

insert_finished() None#
outputs() tuple[list[str], list[str]]#

Define stage output specification.

Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies

process(
task: nemo_curator.tasks.FileGroupTask,
) nemo_curator.tasks.FileGroupTask#

Not implemented for actor-based stages.

ray_stage_spec() dict[str, Any]#

Ray stage specification for this stage.

read_and_insert(
task: nemo_curator.tasks.FileGroupTask,
) nemo_curator.tasks.FileGroupTask#

Read files and insert into shuffler.

setup(_worker_metadata: WorkerMetadata | None = None) None#

Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)