stages.deduplication.semantic.workflow
#
End-to-End Semantic Deduplication Pipeline for Ray Curator.
This module contains the complete semantic deduplication workflow:
K-means clustering on embedding data (always uses RayActorPoolExecutor)
Pairwise similarity computation within clusters + duplicate identification (configurable executor)
Module Contents#
Classes#
End-to-End Semantic Deduplication Workflow. It consists of the following stages: |
API#
- class stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow(input_path: str | list[str], output_path: str, n_clusters: int, cache_path: str | None = None, id_field: str = 'id', embedding_field: str = 'embeddings', embedding_dim: int | None = None, metadata_fields: list[str] | None = None, input_filetype: typing.Literal[parquet, jsonl] = 'parquet', input_file_extensions: list[str] | None = None, max_iter: int = 300, tol: float = 0.0001, random_state: int = 42, init: typing.Literal[k-means||, random] | numpy.ndarray = 'k-means||', n_init: int | typing.Literal[auto] = 1, oversampling_factor: float = 2.0, max_samples_per_batch: int = 1 << 15, distance_metric: typing.Literal[cosine, l2] = 'cosine', which_to_keep: typing.Literal[hard, easy, random] = 'hard', ranking_strategy: nemo_curator.stages.deduplication.semantic.ranking.RankingStrategy | None = None, pairwise_batch_size: int = 1024, eps: float | None = None, _duplicates_num_row_groups_hint: int | None = None, read_kwargs: dict[str, Any] | None = None, cache_kwargs: dict[str, Any] | None = None, write_kwargs: dict[str, Any] | None = None, clear_output: bool = True, verbose: bool = True)#
End-to-End Semantic Deduplication Workflow. It consists of the following stages:
KMeansStage Takes the input path (embeddings) and clusters the embeddings into n_clusters. Writes data partitioned by centroid to cache_path.
PairwiseStage Computes pairwise similarity between all embeddings in each cluster. Takes the output of KMeansStage and computes pairwise similarity between all embeddings in each cluster. This is written to cache_path.
IdentifyDuplicatesStage (optional) Identifies duplicates based on the pairwise similarity scores. Runs only if eps is provided. This is written to output_path.
Initialization
Initialize the semantic deduplication workflow.
Args: input_path: Directory or list of directories containing input files with embeddings output_path: Directory to write output files (i.e. ids to remove) n_clusters: Number of clusters for K-means cache_path: Directory to write cache files (i.e. kmeans and pairwise results) If None, will be set to output_path
# Core data configuration id_field: Name of the ID field in the data embedding_field: Name of the embedding field in the data embedding_dim: Embedding dimension (for memory estimation) metadata_fields: List of metadata field names to preserve in output input_filetype: Type of input files ("parquet" or "jsonl") input_file_extensions: List of file extensions to process # K-means clustering parameters max_iter: Maximum number of K-means iterations tol: Tolerance for K-means convergence random_state: Random seed for K-means init: K-means initialization method n_init: Number of K-means initializations oversampling_factor: K-means++ oversampling factor max_samples_per_batch: Max samples per batch for K-means distance_metric: Distance metric for similarity ("cosine" or "l2") # Pairwise similarity parameters which_to_keep: Strategy for ranking within clusters ("hard", "easy", "random") ranking_strategy: Custom ranking strategy (overrides which_to_keep) pairwise_batch_size: Batch size for pairwise similarity computation # Duplicate identification parameters (optional) eps: Epsilon value for duplicate identification _duplicates_num_row_groups_hint: Number of row groups hint for duplicate removal # I/O and storage parameters read_kwargs: Keyword arguments for reading files (including storage_options) write_kwargs: Keyword arguments for writing files (including storage_options) clear_output: Clear output directory before running # Execution parameters verbose: Enable verbose output
- run(
- pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None,
Run the complete semantic deduplication pipeline.
Args: pairwise_executor: Executor for pairwise stage. Defaults to XennaExecutor().
Returns: Dictionary with results and timing information