nemo_curator.stages.deduplication.semantic.workflow

View as Markdown

End-to-End Semantic Deduplication Pipeline for Ray Curator.

This module contains the complete semantic deduplication workflow:

  1. K-means clustering on embedding data (always uses RayActorPoolExecutor)
  2. Pairwise similarity computation within clusters + duplicate identification (configurable executor)

Module Contents

Classes

NameDescription
SemanticDeduplicationWorkflowEnd-to-End Semantic Deduplication Workflow.

Data

MIN_RECOMMENDED_N_CLUSTERS

API

class nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow(
input_path: str | list[str],
output_path: str,
n_clusters: int,
cache_path: str | None = None,
id_field: str = 'id',
embedding_field: str = 'embeddings',
embedding_dim: int | None = None,
metadata_fields: list[str] | None = None,
input_filetype: typing.Literal['parquet', 'jsonl'] = 'parquet',
input_file_extensions: list[str] | None = None,
max_iter: int = 300,
tol: float = 0.0001,
random_state: int = 42,
init: typing.Literal['k-means||', 'random'] | numpy.ndarray = 'k-means||',
n_init: int | typing.Literal['auto'] = 1,
oversampling_factor: float = 2.0,
max_samples_per_batch: int = 1 << 15,
distance_metric: typing.Literal['cosine', 'l2'] = 'cosine',
which_to_keep: typing.Literal['hard', 'easy', 'random'] = 'hard',
ranking_strategy: nemo_curator.stages.deduplication.semantic.ranking.RankingStrategy | None = None,
pairwise_batch_size: int = 1024,
eps: float | None = None,
_duplicates_num_row_groups_hint: int | None = None,
read_kwargs: dict[str, typing.Any] | None = None,
cache_kwargs: dict[str, typing.Any] | None = None,
write_kwargs: dict[str, typing.Any] | None = None,
clear_output: bool = True,
verbose: bool = True
)

Bases: WorkflowBase

End-to-End Semantic Deduplication Workflow. It consists of the following stages:

  • KMeansStage Takes the input path (embeddings) and clusters the embeddings into n_clusters. Writes data partitioned by centroid to cache_path.
  • PairwiseStage Computes pairwise similarity between all embeddings in each cluster. Takes the output of KMeansStage and computes pairwise similarity between all embeddings in each cluster. This is written to cache_path.
  • IdentifyDuplicatesStage (optional) Identifies duplicates based on the pairwise similarity scores. Runs only if eps is provided. This is written to output_path.
cache_kwargs
cache_path
= cache_path or output_path
duplicates_output_path
= os.path.join(self.output_path, 'duplicates')
kmeans_output_path
= os.path.join(self.cache_path, 'kmeans_results')
pairwise_output_path
= os.path.join(self.cache_path, 'pairwise_results')
read_kwargs
= read_kwargs.copy() if read_kwargs else {}
write_kwargs
= write_kwargs.copy() if write_kwargs else {}
nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._log_configuration(
pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None
) -> None

Log workflow configuration.

nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._run_kmeans_stage(
kmeans_executor: nemo_curator.backends.experimental.ray_actor_pool.RayActorPoolExecutor
) -> list[typing.Any]

Run K-means clustering stage (always uses RayActorPoolExecutor).

nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._run_pairwise_stage(
pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None
) -> list[typing.Any]

Run pairwise similarity + duplicate identification stage.

nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._setup_directories() -> None

Setup output directories with fsspec compliance.

nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._validate_config() -> None

Validate the configuration.

nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow.run(
kmeans_executor: nemo_curator.backends.base.BaseExecutor | None = None,
pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None
) -> nemo_curator.pipeline.workflow.WorkflowRunResult

Run the complete semantic deduplication pipeline.

Parameters:

kmeans_executor
BaseExecutor | NoneDefaults to None

Executor for kmeans stage. Defaults to RayActorPoolExecutor().

pairwise_executor
BaseExecutor | NoneDefaults to None

Executor for pairwise stage. Defaults to XennaExecutor().

Returns: WorkflowRunResult

WorkflowRunResult object containing the results and timing information

nemo_curator.stages.deduplication.semantic.workflow.MIN_RECOMMENDED_N_CLUSTERS = 1000