nemo_curator.stages.text.deduplication.semantic

View as Markdown

Monolithic Text Semantic Deduplication Workflow.

This module contains a complete end-to-end workflow for text semantic deduplication:

  1. Embedding generation from text data
  2. Semantic deduplication using clustering and pairwise similarity
  3. Optional duplicate removal based on identified duplicates

Module Contents

Classes

NameDescription
TextSemanticDeduplicationWorkflowMonolithic workflow for end-to-end text semantic deduplication.

API

class nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow(
input_path: str | list[str],
output_path: str,
cache_path: str | None = None,
perform_removal: bool = True,
text_field: str = 'text',
embedding_field: str = 'embeddings',
model_identifier: str = 'google/embeddinggemma-300m',
embedding_max_chars: int | None = None,
embedding_pretokenize: bool = False,
embedding_vllm_init_kwargs: dict[str, typing.Any] | None = None,
hf_token: str | None = None,
model_cache_dir: str | None = None,
n_clusters: int = 100,
id_field: str = CURATOR_DEDUP_ID_STR,
embedding_dim: int | None = None,
metadata_fields: list[str] | None = None,
distance_metric: typing.Literal['cosine', 'l2'] = 'cosine',
which_to_keep: typing.Literal['hard', 'easy', 'random'] = 'hard',
eps: float | None = 0.01,
kmeans_max_iter: int = 300,
kmeans_tol: float = 0.0001,
kmeans_random_state: int = 42,
kmeans_init: str = 'k-means||',
kmeans_n_init: int | typing.Literal['auto'] = 1,
kmeans_oversampling_factor: float = 2.0,
kmeans_max_samples_per_batch: int = 1 << 15,
ranking_strategy: nemo_curator.stages.deduplication.semantic.ranking.RankingStrategy | None = None,
pairwise_batch_size: int = 1024,
_duplicates_num_row_groups_hint: int | None = None,
use_id_generator: bool = False,
id_generator_state_file: str | None = None,
input_filetype: typing.Literal['jsonl', 'parquet'] = 'parquet',
input_file_extensions: list[str] | None = None,
input_files_per_partition: int | None = None,
input_blocksize: int | None = None,
output_filetype: typing.Literal['jsonl', 'parquet'] = 'parquet',
output_file_extension: str | None = None,
output_fields: list[str] | None = None,
read_kwargs: dict[str, typing.Any] = dict(),
cache_kwargs: dict[str, typing.Any] = dict(),
write_kwargs: dict[str, typing.Any] = dict(),
verbose: bool = True,
clear_output: bool = True
)
Dataclass

Monolithic workflow for end-to-end text semantic deduplication.

This workflow combines:

  1. Text embedding generation (configurable executor)
  2. Semantic deduplication (configurable executor for pairwise stage)
  3. Duplicate removal (configurable executor)

Supports flexible executor configuration - can use a single executor for all stages or different executors for different phases.

_duplicates_num_row_groups_hint
int | None = None
cache_kwargs
dict[str, Any] = field(default_factory=dict)
cache_path
str | None = None
clear_output
bool = True

Initialize the text semantic deduplication workflow.

distance_metric
Literal['cosine', 'l2'] = 'cosine'
embedding_dim
int | None = None
embedding_field
str = 'embeddings'
embedding_max_chars
int | None = None
embedding_pretokenize
bool = False
embedding_vllm_init_kwargs
dict[str, Any] | None = None
eps
float | None = 0.01
hf_token
str | None = None
id_field
str = CURATOR_DEDUP_ID_STR
id_generator_state_file
str | None = None
input_blocksize
int | None = None
input_file_extensions
list[str] | None = None
input_files_per_partition
int | None = None
input_filetype
Literal['jsonl', 'parquet'] = 'parquet'
input_path
str | list[str]
kmeans_init
str = 'k-means||'
kmeans_max_iter
int = 300
kmeans_max_samples_per_batch
int = 1 << 15
kmeans_n_init
int | Literal['auto'] = 1
kmeans_oversampling_factor
float = 2.0
kmeans_random_state
int = 42
kmeans_tol
float = 0.0001
metadata_fields
list[str] | None = None
model_cache_dir
str | None = None
model_identifier
str = 'google/embeddinggemma-300m'
n_clusters
int = 100
output_fields
list[str] | None = None
output_file_extension
str | None = None
output_filetype
Literal['jsonl', 'parquet'] = 'parquet'
output_path
str
pairwise_batch_size
int = 1024
perform_removal
bool = True
ranking_strategy
RankingStrategy | None = None
read_kwargs
dict[str, Any] = field(default_factory=dict)
text_field
str = 'text'
use_id_generator
bool = False
verbose
bool = True
which_to_keep
Literal['hard', 'easy', 'random'] = 'hard'
write_kwargs
dict[str, Any] = field(default_factory=dict)
nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow.__post_init__()

Initialize parent class after dataclass initialization.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow._log_configuration() -> None

Log workflow configuration.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow._run_duplicate_removal(
executor: nemo_curator.backends.base.BaseExecutor
) -> nemo_curator.pipeline.workflow.WorkflowRunResult | None

Run duplicate removal stage.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow._run_embedding_generation(
executor: nemo_curator.backends.base.BaseExecutor
) -> list[nemo_curator.tasks.Task]

Run embedding generation stage.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow._run_semantic_deduplication(
kmeans_executor: nemo_curator.backends.base.BaseExecutor,
pairwise_executor: nemo_curator.backends.base.BaseExecutor
) -> nemo_curator.pipeline.workflow.WorkflowRunResult

Run semantic deduplication stage.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow._setup_directories() -> None

Setup output directories.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow._validate_config() -> None

Validate workflow configuration.

nemo_curator.stages.text.deduplication.semantic.TextSemanticDeduplicationWorkflow.run(
streaming_executor: nemo_curator.backends.base.BaseExecutor | tuple[nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor] | None = None, streaming_executor: nemo_curator.backends.base.BaseExecutor | tuple[nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor] | None = None, streaming_executor: nemo_curator.backends.base.BaseExecutor | tuple[nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor] | None = None, streaming_executor: nemo_curator.backends.base.BaseExecutor | tuple[nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor, nemo_curator.backends.base.BaseExecutor] | None = None,
batch_executor: nemo_curator.backends.base.BaseExecutor | None = None
) -> nemo_curator.pipeline.workflow.WorkflowRunResult

Run the complete text semantic deduplication workflow.

Returns: WorkflowRunResult

WorkflowRunResult object containing the results and timing information from all stages