stages.deduplication.fuzzy.workflow
#
Module Contents#
Classes#
A pipeline that performs fuzzy deduplication of a dataset. It consists of the following stages: |
Data#
API#
- class stages.deduplication.fuzzy.workflow.FuzzyDeduplicationWorkflow(
- cache_path: str,
- output_path: str,
- input_path: str | list[str] | None = None,
- input_filetype: Literal[jsonl, parquet] = 'parquet',
- input_blocksize: str | int = '1GiB',
- input_file_extensions: list[str] | None = None,
- read_kwargs: dict[str, Any] | None = None,
- cache_kwargs: dict[str, Any] | None = None,
- write_kwargs: dict[str, Any] | None = None,
- text_field: str = 'text',
- perform_removal: bool = False,
- seed: int = 42,
- char_ngrams: int = 24,
- num_bands: int = 20,
- minhashes_per_band: int = 13,
- use_64_bit_hash: bool = False,
- bands_per_iteration: int = 5,
- env_vars: dict[str, Any] | None = None,
A pipeline that performs fuzzy deduplication of a dataset. It consists of the following stages:
FilePartitioningStage Groups input files into smaller groups that can be processed in parallel.
MinHashStage Computes minhashes for the input dataset.
LSHStage Performs Locality Sensitive Hashing on the minhashes. This is a shuffle stage that involves moving data between workers.
BucketsToEdgesStage This stage converts the resulting LSH mapping of bucket ID to document ID into a graph of edges.
ConnectedComponentsStage Performs weaklyconnected components clustering on the graph represented by the edgelist.
IdentifyDuplicatesStage Generates a list of document ids to remove based on the connected components clusters/components.
Removal (Optional) Currently not implemented.
Initialization
Configuration for MinHash based fuzzy duplicates detection. Parameters cache_path: str Directory to store deduplication intermediates such as minhashes/buckets etc. output_path: str Directory to store the duplicate Ids and the id generator mapping for removal pipelines. It also stores the deduplicated output files, if
perform_removal
is True. input_path: str | list[str] | None Directory or list of files containing the input dataset. Unused ifinitial_tasks
is provided during workflow run. input_filetype: Literal[“jsonl”, “parquet”] Format of the input dataset. input_blocksize: str | int Size of the input blocks to read in. If an integer is provided, it will be interpreted as bytes. If a string is provided, it will be interpreted as a size with a unit. If not provided, the default blocksize of 1GiB will be used. input_file_extensions: list[str] | None File extensions of the input dataset. If not provided, the default extensions for the input_filetype will be used. If provided, this will override the default extensions for the input_filetype. read_kwargs: dict[str, Any] | None = None Additional keyword arguments to pass for reading the input files. This could include the storage_options dictionary when reading from remote storage. cache_kwargs: dict[str, Any] | None = None Additional keyword arguments to pass for intermediate files written to cache_dir. This could include the storage_options dictionary when writing to remote storage. write_kwargs: dict[str, Any] | None = None Additional keyword arguments to pass for deduplicated results written to output_dir. This could include the storage_options dictionary when writing to remote storage.text_field: str Field containing the text to deduplicate. perform_removal: bool Whether to remove the duplicates from the original dataset.
seed: int Seed for minhash permutations char_ngrams: int Size of Char ngram shingles used in minhash computation num_buckets: int Number of Bands or buckets to use during Locality Sensitive Hashing hashes_per_bucket: int Number of hashes per bucket/band. use_64_bit_hash: bool Whether to use a 32bit or 64bit hash function for minhashing. bands_per_iteration: int Number of bands/buckets to shuffle concurrently. Larger values process larger batches by processing multiple bands but might lead to memory pressures and related errors.
env_vars: dict[str, Any] | None = None Environment variables to pass to the pipeline.
- run(
- initial_tasks: list[nemo_curator.tasks.FileGroupTask] | None = None,
Run the deduplication pipeline.
Args: initial_tasks: Set of FileGroupTasks generated by a previous stage pointing to the dataset to be deduplicated. If not provided, the pipeline will generate the input tasks based on the input_dir and input_file_extensions.
- stages.deduplication.fuzzy.workflow.ID_GENERATOR_OUTPUT_FILENAME#
‘fuzzy_id_generator.json’