nemo_curator.stages.text.deduplication.removal

View as Markdown

Removal stage for distributed deduplication pipeline.

This stage implements the removal phase of the distributed deduplication approach:

  1. Takes a DocumentBatch and determines the min/max ID range
  2. Filters the parquet files for IDs to remove within this range
  3. Filters out documents based on the removal list
  4. Returns the filtered DocumentBatch

Module Contents

Classes

NameDescription
TextDuplicatesRemovalStageStage for removing duplicate documents based on pre-computed removal lists.

API

class nemo_curator.stages.text.deduplication.removal.TextDuplicatesRemovalStage(
ids_to_remove_path: str,
id_field: str = CURATOR_DEDUP_ID_STR,
duplicate_id_field: str = 'id',
read_kwargs: dict[str, typing.Any] | None = None
)
Dataclass

Bases: ProcessingStage[DocumentBatch, DocumentBatch]

Stage for removing duplicate documents based on pre-computed removal lists.

Parameters:

ids_to_remove_path
str

Path to parquet files containing IDs to remove

id_field
strDefaults to CURATOR_DEDUP_ID_STR

Field to use for deduplication within the input dataframe. Defaults to CURATOR_DEDUP_ID_STR.

duplicate_id_field
strDefaults to 'id'

Field to use for deduplication within the removal dataframe. Defaults to “id”.

read_kwargs
dict[str, Any] | NoneDefaults to None

Additional arguments for reading parquet files

duplicate_id_field
str = 'id'
id_field
str = CURATOR_DEDUP_ID_STR
ids_to_remove_path
str
read_kwargs
dict[str, Any] | None = None
nemo_curator.stages.text.deduplication.removal.TextDuplicatesRemovalStage.__post_init__()

Initialize parent class after dataclass initialization.

nemo_curator.stages.text.deduplication.removal.TextDuplicatesRemovalStage.inputs() -> tuple[list[str], list[str]]
nemo_curator.stages.text.deduplication.removal.TextDuplicatesRemovalStage.process(
task: nemo_curator.tasks.DocumentBatch
) -> nemo_curator.tasks.DocumentBatch

Our deduplicator should’ve written out a parquet file with the IDs to remove. We read that file, filter the input dataframe to only include the IDs to remove, and return the filtered dataframe. We optimize by not loading the whole ids to remove into memory, but only loading the ids that are in the range of the input dataframe.