nemo_curator.stages.deduplication.exact.identification

View as Markdown

Module Contents

Classes

NameDescription
ExactDuplicateIdentificationStage that finds exact duplicates in a given column.

Data

EXACT_DUPLICATE_GROUP_FIELD

API

class nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification(
text_field: str,
output_path: str,
input_filetype: typing.Literal['jsonl', 'parquet'] = 'parquet',
read_kwargs: dict[str, typing.Any] | None = None,
write_kwargs: dict[str, typing.Any] | None = None,
assign_id: bool = True,
id_field: str | None = None,
total_nparts: int | None = None,
rmm_pool_size: int | typing.Literal['auto'] | None = 'auto',
spill_memory_limit: int | typing.Literal['auto'] | None = 'auto',
enable_statistics: bool = False
)

Bases: DeduplicationIO, ShuffleStage

Stage that finds exact duplicates in a given column.

Parameters

text_field Field name representing the field to find duplicates in. output_path Path to write output files. input_filetype Type of the input files. Must be one of “jsonl” or “parquet”. Default is “parquet”. read_kwargs Keyword arguments for cudf.read_parquet method. write_kwargs Keyword arguments for cudf.to_parquet method. assign_id Whether to assign a unique id to each document. id_field Existing id field name if not assigning a new id. total_nparts Total number of output partitions. If None, will be set automatically by the executor. rmm_pool_size Size of the RMM GPU memory pool in bytes. If “auto”, the memory pool is set to 90% of the free GPU memory. If None, the memory pool is set to 50% of the free GPU memory that can expand if needed. spill_memory_limit Device memory limit in bytes for spilling to host. If “auto”, the limit is set to 80% of the RMM pool size. If None spilling is disabled. enable_statistics Whether the underlying rapidsmpf shuffler should collect shuffle statistics.

id_field
name
= 'ExactDuplicateIds'
output_fs
output_path
= self.output_fs.sep.join([output_path, self.name])
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification._get_removal_ids(
df: cudf.DataFrame
) -> cudf.DataFrame

Get the removal ids for the given dataframe.

nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification._hash_and_insert(
df: cudf.DataFrame
) -> None

Hash the text field and insert into the shuffle actor.

Parameters

df DataFrame containing the id_field and text_field columns.

nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification._read_files(
filepaths: list[str]
) -> cudf.DataFrame

Read files and return a DataFrame.

Parameters

filepaths List of file paths to read.

Returns

cudf.DataFrame DataFrame containing the id_field and text_field columns.

nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.extract_and_write() -> list[nemo_curator.tasks.FileGroupTask]
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.inputs() -> tuple[list[str], list[str]]
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.insert_finished() -> None
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.outputs() -> tuple[list[str], list[str]]
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.process(
task: nemo_curator.tasks.FileGroupTask
) -> nemo_curator.tasks.FileGroupTask
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.ray_stage_spec() -> dict[str, typing.Any]
nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.read_and_insert(
task: nemo_curator.tasks.FileGroupTask
) -> nemo_curator.tasks.FileGroupTask

Single task processing is not supported.

Raises

NotImplementedError Always raised as this stage only supports batch processing.

nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.read_and_insert_batch(
tasks: list[nemo_curator.tasks.FileGroupTask]
) -> list[nemo_curator.tasks.FileGroupTask]

Batch process multiple file group tasks for exact deduplication.

This method reads all files from all tasks, concatenates them (if needed), hashes the text field using MD5, and inserts into the shuffle actor for deduplication. Processing tasks in batches significantly improves throughput by increasing the size of batches inserted during shuffle.

Parameters

tasks List of FileGroupTask objects containing files to process. Must contain at least one task.

Returns

list[FileGroupTask] The input tasks unchanged. The actual deduplication results are written through the shuffle actor as a side effect.

Raises

RuntimeError If ID generator is not initialized when assign_id is True.

nemo_curator.stages.deduplication.exact.identification.ExactDuplicateIdentification.setup(
_worker_metadata: nemo_curator.backends.base.WorkerMetadata | None = None
) -> None
nemo_curator.stages.deduplication.exact.identification.EXACT_DUPLICATE_GROUP_FIELD = '_exact_duplicate_group'