Deduplication#

Exact#

class nemo_curator.ExactDuplicates(
logger: logging.LoggerAdapter | str = './',
id_field: str = 'id',
text_field: str = 'text',
hash_method: str = 'md5',
perform_removal: bool = False,
profile_dir: str | None = None,
cache_dir: str | None = None,
)#

Find exact duplicates in a document corpus

hash_documents(
df: cudf.Series | pd.Series,
) cudf.Series | pd.Series#

Compute hashes for a Series containing documents

identify_duplicates(
dataset: DocumentDataset,
) DocumentDataset#

Find document IDs for exact duplicates in a given DocumentDataset :param dataset: The input datset to find exact duplicates :type dataset: DocumentDataset

Return type:

DocumentDataset containing IDs and hashes of all duplicate documents

remove(
dataset: DocumentDataset,
duplicates_to_remove: DocumentDataset | None,
) DocumentDataset#

Remove exact duplicates from a given DocumentDataset :param dataset: The input dataset from which to remove exact duplicates :type dataset: DocumentDataset :param duplicates_to_remove: The dataset containing IDs of the exact duplicates to remove :type duplicates_to_remove: DocumentDataset

Return type:

DocumentDataset containing only non-duplicate documents

Fuzzy#

class nemo_curator.BucketsToEdges(
cache_dir: str | None = None,
id_fields: list | str = 'id',
str_id_name: str = 'id',
bucket_field: str = '_bucket_id',
logger: logging.LoggerAdapter | str = './',
profile_dir: str | None = None,
)#

Maps buckets generated from LSH into an edgelist that can be processed further by Connected Components to find duplicate documents

class nemo_curator.ConnectedComponents(
cache_dir: str,
jaccard_pairs_path: str,
id_column: str = 'id',
jaccard_threshold: float = 0.8,
logger: logging.LoggerAdapter | str = './',
profile_dir: str | None = None,
)#
class nemo_curator.FuzzyDuplicatesConfig(
cache_dir: str,
profile_dir: str | None = None,
id_field: str = 'id',
text_field: str = 'text',
perform_removal: bool = False,
seed: int = 42,
char_ngrams: int = 24,
num_buckets: int = 20,
hashes_per_bucket: int = 13,
use_64_bit_hash: bool = False,
buckets_per_shuffle: int = 1,
false_positive_check: bool = False,
num_anchors: int | None = None,
jaccard_threshold: float | None = None,
bucket_mapping_blocksize: int | None = None,
parts_per_worker: int | None = None,
bucket_parts_per_worker: int | None = None,
)#

Configuration for MinHash based fuzzy duplicates detection. :param seed: :type seed: Seed for minhash permutations :param char_ngrams: :type char_ngrams: Size of Char ngram shingles used in minhash computation :param num_buckets: :type num_buckets: Number of Bands or buckets to use during Locality Sensitive Hashing :param hashes_per_bucket: :type hashes_per_bucket: Number of hashes per bucket/band. :param use_64_bit_hash: :type use_64_bit_hash: Whether to use a 32bit or 64bit hash function for minhashing. :param buckets_per_shuffle: Larger values process larger batches by processing multiple bands

but might lead to memory pressures and related errors.

Parameters:
  • id_field (Column in the Dataset denoting document ID.)

  • text_field (Column in the Dataset denoting document content.)

  • perform_removal (Boolean value to specify whether calling the module should remove the duplicates from) – the original dataset, or return the list of IDs denoting duplicates.

  • profile_dir (str, Default None) – If specified directory to write dask profile

  • cache_dir (str, Default None) – Location to store deduplcation intermediates such as minhashes/buckets etc.

  • false_positive_check (bool,) – Whether to run a check to look for false positives within buckets. Note: This is a computationally expensive step.

  • num_anchors (int) – Number of documents per bucket to use as reference for computing jaccard pairs within that bucket to identify false positives.

  • jaccard_threshold (float) – The Jaccard similariy threshold to consider a document a near duplicate during false positive evaluations.

class nemo_curator.FuzzyDuplicates(
config: FuzzyDuplicatesConfig,
logger: logging.LoggerAdapter | str = './',
perform_removal: bool = False,
)#
identify_duplicates(
dataset: DocumentDataset,
) DocumentDataset | None#
Parameters:

dataset (DocumentDataset) – The input datset to compute FuzzyDuplicates. Must contain a text and unique id field.

Returns:

  • DocumentDataset containing IDs of all documents and the corresponding duplicate group

  • they belong to. Documents in the same group are near duplicates.

remove(
dataset: DocumentDataset,
duplicates_to_remove: DocumentDataset | None,
) DocumentDataset#

Remove fuzzy duplicates from a given DocumentDataset :param dataset: The input dataset from which to remove fuzzy duplicates :type dataset: DocumentDataset :param duplicates_to_remove: The dataset containing IDs of the fuzzy duplicates to remove :type duplicates_to_remove: DocumentDataset

Return type:

DocumentDataset containing only non-duplicate documents

class nemo_curator.JaccardSimilarity(
id_field: str = 'id',
anchor_id_fields: list[str] | None = None,
text_field: str = 'text',
ngram_width: int = 5,
)#
class nemo_curator.LSH(
cache_dir: str,
num_hashes: int,
num_buckets: int,
buckets_per_shuffle: int = 1,
false_positive_check: bool = False,
logger: logging.LoggerAdapter | str = './',
id_fields: str | list = 'id',
minhash_field: str = '_minhash_signature',
profile_dir: str | None = None,
)#

Performs LSH on a MinhashSignatures

bucket_id_to_int(
bucket_ddf: dask_cudf.DataFrame,
bucket_col_name: str = 'bucket_id',
start_id: int = 0,
) tuple[dask_cudf.DataFrame, int]#

Maps bucket ids to a contigious integer range from starting from start_id.

lsh(write_path: str, df: dask_cudf.DataFrame) bool#

Computes hash buckets for the DataFrame and writes them as parquet files to the specified path.

Parameters:
  • write_path (-) – The directory path to write parquet files.

  • df (-) – The input DataFrame with minhashes to be bucketed.

Returns:

True if buckets were empty (no duplicates found), False otherwise.

Return type:

are_buckets_empty

class nemo_curator.MinHash(
seed: int = 42,
num_hashes: int = 260,
char_ngrams: int = 24,
use_64bit_hash: bool = False,
logger: logging.LoggerAdapter | str = './',
id_field: str = 'id',
text_field: str = 'text',
profile_dir: str | None = None,
cache_dir: str | None = None,
)#

Computes minhash signatures of a document corpus

generate_hash_permutation_seeds(
bit_width: int,
n_permutations: int = 260,
seed: int = 0,
) numpy.ndarray#

Generate seeds for all minhash permutations based on the given seed.

minhash32(
ser: cudf.Series,
seeds: numpy.ndarray,
char_ngram: int,
) cudf.Series#

Compute 32bit minhashes based on the MurmurHash3 algorithm

minhash64(
ser: cudf.Series,
seeds: numpy.ndarray,
char_ngram: int,
) cudf.Series#

Compute 64bit minhashes based on the MurmurHash3 algorithm

Semantic#

class nemo_curator.SemDedup(
config: SemDedupConfig,
input_column: str = 'text',
id_column: str = 'id',
perform_removal: bool = False,
logger: Logger | str = './',
)#
identify_duplicates(
dataset: DocumentDataset,
) DocumentDataset#

Identify duplicates in the dataset. Returns a list of ids that are duplicates to each other.

remove(
dataset: DocumentDataset,
duplicates_to_remove: DocumentDataset,
) DocumentDataset#

Remove duplicates from the dataset.

class nemo_curator.SemDedupConfig(
cache_dir: str,
profile_dir: str | None = None,
num_files: int = -1,
embedding_model_name_or_path: str = 'sentence-transformers/all-MiniLM-L6-v2',
embedding_batch_size: int = 128,
embeddings_save_loc: str = 'embeddings',
embedding_max_mem_gb: int | None = None,
embedding_pooling_strategy: str = 'mean_pooling',
embedding_column: str = 'embeddings',
write_embeddings_to_disk: bool = True,
write_to_filename: bool = False,
max_iter: int = 100,
n_clusters: int = 1000,
clustering_save_loc: str = 'clustering_results',
random_state: int = 1234,
sim_metric: Literal['cosine', 'l2'] = 'cosine',
which_to_keep: Literal['hard', 'easy', 'random'] = 'hard',
batched_cosine_similarity: bool | int = 1024,
clustering_input_partition_size: str = '2gb',
eps_to_extract: float = 0.01,
)#

Configuration for Semantic Deduplication.

cache_dir#

Directory to store cache.

Type:

str

profile_dir#

If specified, directory to write Dask profile. Default is None.

Type:

Optional[str]

num_files#

Number of files. Default is -1, meaning all files.

Type:

int

embedding_model_name_or_path#

Model name or path for embeddings. Default is “sentence-transformers/all-MiniLM-L6-v2”.

Type:

str

embedding_batch_size#

Initial batch size for processing embeddings. Default is 128.

Type:

int

embeddings_save_loc#

Location to save embeddings. Default is “embeddings”.

Type:

str

embedding_max_mem_gb#

Maximum memory usage in GB for the embedding process. If None, it defaults to the available GPU memory minus 4 GB.

Type:

int

embedding_pooling_strategy#

Strategy for pooling embeddings, either “mean_pooling” or “last_token”. Default is “mean_pooling”.

Type:

str

embedding_column#

The column name that stores the embeddings. Default is “embeddings”.

Type:

str

write_embeddings_to_disk#

If True, saves the embeddings to disk. We recommend setting this to False when you have a delayed pipeline. Setting it to False can lead to more memory overhead. Default is True.

Type:

bool

write_to_filename#

If True, saves the embeddings to the same filename as input files. Default False.

Type:

bool

max_iter#

Maximum iterations for clustering. The more iterations, the better the clustering. Default is 100.

Type:

int

n_clusters#

Number of clusters. Default is 1000.

Type:

int

clustering_save_loc#

Location to save clustering results. Default is “clustering_results”.

Type:

str

random_state#

KMeans random state used for reproducibility. Default is 1234.

Type:

int

sim_metric#

Similarity metric to use to rank within cluster. Default is “cosine”. which_to_keep determines how points within each cluster are ranked, based on the similarity to the centroid defined by sim_metric

Type:

“cosine” or “l2”

which_to_keep#

Method to determine which duplicates to keep. Default is “hard”. - hard retains edge-case or outlier items farthest from the centroid by sorting points by decreasing distance from the centroid. - easy retains representative items closest to the centroid by sorting points by increasing distance from the centroid. - random retains items randomly.

Type:

str

batched_cosine_similarity#

Whether to use batched cosine similarity (has less memory usage). Default is 1024. When False or 0, no batching is used and memory requirements are O(N^2) where N is the number of items in the cluster. When True, batch size is set to 1024 and memory requirements are O(N*B) where N is the number of items in the cluster and B is the batch size.

Type:

Union[bool, int]

clustering_input_partition_size#

The size of data partition with which to run KMeans. Default is “2gb”. If None, then the dataset is not repartitioned.

Type:

Optional[str]

eps_to_extract#

Epsilon value to extract deduplicated data. Default is 0.01.

Type:

float

class nemo_curator.EmbeddingCreator(
embedding_model_name_or_path: str = 'sentence-transformers/all-MiniLM-L6-v2',
embedding_batch_size: int = 128,
embedding_output_dir: str = './embeddings',
embedding_max_mem_gb: int | None = None,
embedding_pooling_strategy: str = 'mean_pooling',
input_column: str = 'text',
embedding_column: str = 'embeddings',
write_embeddings_to_disk: bool = True,
write_to_filename: bool = False,
logger: Logger | str = './',
profile_dir: str | None = None,
)#
class nemo_curator.ClusteringModel(
id_column: str = 'id',
max_iter: int = 100,
n_clusters: int = 1000,
clustering_output_dir: str = './clustering_results',
embedding_column: str = 'embeddings',
random_state: int = 1234,
clustering_input_partition_size: str | None = '2gb',
logger: Logger | str = './',
profile_dir: str | None = None,
keep_all_columns: bool = False,
)#
class nemo_curator.SemanticClusterLevelDedup(
n_clusters: int = 1000,
emb_by_clust_dir: str = './clustering_results/embs_by_nearest_center',
id_column: str = 'id',
which_to_keep: str = 'hard',
sim_metric: Literal['cosine', 'l2'] = 'cosine',
output_dir: str = './clustering_results',
embedding_column: str = 'embeddings',
batched_cosine_similarity: int = 1024,
logger: Logger | str = './',
profile_dir: str | None = None,
)#
extract_dedup_data(
eps_to_extract: float,
) DocumentDataset#

Extract similar records that are within epsilon threshold. These records can be removed from the dataset. :param eps_to_extract: Epsilon threshold for extracting deduplicated data. :type eps_to_extract: float

Returns:

Dataset containing list of ids that are can be removed.

Return type:

DocumentDataset