stages.text.deduplication.removal
#
Removal stage for distributed deduplication pipeline.
This stage implements the removal phase of the distributed deduplication approach:
Takes a DocumentBatch and determines the min/max ID range
Filters the parquet files for IDs to remove within this range
Filters out documents based on the removal list
Returns the filtered DocumentBatch
Module Contents#
Classes#
Stage for removing duplicate documents based on pre-computed removal lists. |
API#
- class stages.text.deduplication.removal.TextDuplicatesRemovalStage#
Bases:
nemo_curator.stages.base.ProcessingStage
[nemo_curator.tasks.DocumentBatch
,nemo_curator.tasks.DocumentBatch
]Stage for removing duplicate documents based on pre-computed removal lists.
Args: ids_to_remove_path: Path to parquet files containing IDs to remove id_field: Field to use for deduplication within the input dataframe. Defaults to CURATOR_DEDUP_ID_STR. duplicate_id_field: Field to use for deduplication within the removal dataframe. Defaults to “id”. read_kwargs: Additional arguments for reading parquet files
- duplicate_id_field: str#
‘id’
- id_field: str#
None
- ids_to_remove_path: str#
None
- inputs() tuple[list[str], list[str]] #
Define stage input requirements.
Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present
- process(
- task: nemo_curator.tasks.DocumentBatch,
Our deduplicator should’ve written out a parquet file with the IDs to remove. We read that file, filter the input dataframe to only include the IDs to remove, and return the filtered dataframe. We optimize by not loading the whole ids to remove into memory, but only loading the ids that are in the range of the input dataframe.
- read_kwargs: dict[str, Any] | None#
None