stages.deduplication.exact.workflow#

Module Contents#

Classes#

ExactDeduplicationWorkflow

A pipeline that performs exact deduplication of a dataset. It consists of the following stages:

Data#

API#

class stages.deduplication.exact.workflow.ExactDeduplicationWorkflow(
output_path: str,
input_path: str | list[str] | None = None,
input_filetype: Literal[jsonl, parquet] = 'parquet',
input_blocksize: str | int = '2GiB',
input_file_extensions: list[str] | None = None,
read_kwargs: dict[str, Any] | None = None,
write_kwargs: dict[str, Any] | None = None,
assign_id: bool = True,
id_field: str | None = None,
text_field: str = 'text',
perform_removal: bool = False,
env_vars: dict[str, Any] | None = None,
)#

A pipeline that performs exact deduplication of a dataset. It consists of the following stages:

  • FilePartitioningStage Groups input files into smaller groups that can be processed in parallel.

  • ExactDuplicateIdentification Finds exact duplicates in a given column by hashing the column.

  • Removal (Optional) Currently not implemented.

Initialization

Configuration for exact duplicates detection. Parameters output_path: str Directory to store the duplicate Ids and the id generator mapping for removal pipelines. It also stores the deduplicated output files, if perform_removal is True. input_path: str | list[str] | None Directory or list of files containing the input dataset. Unused if initial_tasks is provided during workflow run. input_filetype: Literal[“jsonl”, “parquet”] Format of the input dataset. input_blocksize: str | int Size of the input blocks to read in. If an integer is provided, it will be interpreted as bytes. If a string is provided, it will be interpreted as a size with a unit. If not provided, the default blocksize of 1GiB will be used. input_file_extensions: list[str] | None File extensions of the input dataset. If not provided, the default extensions for the input_filetype will be used. If provided, this will override the default extensions for the input_filetype. read_kwargs: dict[str, Any] | None = None Additional keyword arguments to pass for reading the input files. This could include the storage_options dictionary when reading from remote storage. write_kwargs: dict[str, Any] | None = None Additional keyword arguments to pass for deduplicated results written to output_dir. This could include the storage_options dictionary when writing to remote storage. assign_id: bool Whether to automatically assign a unique id to each document. id_field: str | None Existing id field name if not automatically assigning a new id. text_field: str Field containing the text to deduplicate. perform_removal: bool Whether to remove the duplicates from the original dataset. env_vars: dict[str, Any] | None = None Environment variables to pass to the pipeline.

run(
initial_tasks: list[nemo_curator.tasks.FileGroupTask] | None = None,
) None#

Run the deduplication pipeline.

Args: initial_tasks: Set of FileGroupTasks generated by a previous stage pointing to the dataset to be deduplicated. If not provided, the pipeline will generate the input tasks based on the input_dir and input_file_extensions.

stages.deduplication.exact.workflow.ID_GENERATOR_OUTPUT_FILENAME#

‘exact_id_generator.json’