nemo_curator.stages.deduplication.semantic.kmeans

View as Markdown

Module Contents

Classes

NameDescription
KMeansReadFitWriteStageKMeans clustering stage that requires RAFT for distributed processing.
KMeansStageKMeans clustering stage that requires RAFT for distributed processing.

Data

COSINE_DIST_TO_CENT_COL

L2_DIST_TO_CENT_COL

API

class nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage(
id_field: str,
embedding_field: str,
output_path: str,
filetype: typing.Literal['parquet', 'jsonl'],
n_clusters: int,
metadata_fields: list[str] | None = None,
embedding_dim: int | None = None,
verbose: bool = False,
max_iter: int = 300,
tol: float = 0.0001,
random_state: int = 42,
init: typing.Literal['k-means||', 'random'] | numpy.ndarray = 'k-means||',
n_init: int | typing.Literal['auto'] = 1,
oversampling_factor: float = 2.0,
max_samples_per_batch: int = 1 << 15,
read_kwargs: dict[dict] | None = None,
write_kwargs: dict[dict] | None = None
)

Bases: ProcessingStage[FileGroupTask, _EmptyTask], DeduplicationIO

KMeans clustering stage that requires RAFT for distributed processing.

input_storage_options
= self.read_kwargs.pop('storage_options', None)
metadata_fields
name
= 'KMeansStage'
output_storage_options
= self.write_kwargs.pop('storage_options', None)
read_kwargs
resources
= Resources(cpus=1.0, gpus=1.0)
write_kwargs
nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage._assign_distances(
df: cudf.DataFrame,
embedding_col: str,
centroids: cupy.ndarray
) -> cudf.DataFrame
staticmethod

Computes the L2 distance to nearest centroid to each embedding in the DataFrame. Embeddings are normalized. For cosine we’ll need to normalize the centroids as well.

nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage.normalize_embeddings_col_in_df(
df: cudf.DataFrame,
embedding_col: str
) -> cudf.DataFrame
staticmethod
nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage.process(
task: nemo_curator.tasks.FileGroupTask
) -> nemo_curator.tasks._EmptyTask
nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage.process_batch(
tasks: list[nemo_curator.tasks.FileGroupTask]
) -> list[nemo_curator.tasks._EmptyTask]

Process a batch of FileGroupTasks using distributed RAFT KMeans.

In RAFT mode, each actor processes its assigned tasks, but the KMeans model is trained cooperatively across all actors using RAFT communication.

This method:

  1. Reads data from this actor’s assigned tasks
  2. Breaks data into subgroups to avoid cudf row limits
  3. Fits distributed KMeans model (coordinates with other actors via RAFT)
  4. Assigns cluster centroids back to each subgroup
  5. Writes the results for each subgroup
nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage.ray_stage_spec() -> dict[str, typing.Any]
nemo_curator.stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage.setup(
_: nemo_curator.backends.base.WorkerMetadata | None = None
) -> None
class nemo_curator.stages.deduplication.semantic.kmeans.KMeansStage(
n_clusters: int,
id_field: str,
embedding_field: str,
input_path: str | list[str],
output_path: str,
metadata_fields: list[str] | None = None,
verbose: bool = False,
embedding_dim: int | None = None,
input_filetype: typing.Literal['jsonl', 'parquet'] = 'parquet',
input_file_extensions: list[str] | None = None,
read_kwargs: dict[dict] | None = None,
write_kwargs: dict[dict] | None = None,
max_iter: int = 300,
tol: float = 0.0001,
random_state: int = 42,
init: typing.Literal['k-means||', 'random'] | numpy.ndarray = 'k-means||',
n_init: int | typing.Literal['auto'] = 1,
oversampling_factor: float = 2.0,
max_samples_per_batch: int = 1 << 15
)
Dataclass

Bases: CompositeStage[_EmptyTask, _EmptyTask]

KMeans clustering stage that requires RAFT for distributed processing.

embedding_dim
int | None = None
embedding_field
str
id_field
str
init
Literal['k-means||', 'random'] | ndarray = 'k-means||'
input_file_extensions
list[str] | None = None
input_filetype
Literal['jsonl', 'parquet'] = 'parquet'
input_path
str | list[str]
max_iter
int = 300
max_samples_per_batch
int = 1 << 15

KMeans clustering stage that requires RAFT for distributed processing.

metadata_fields
list[str] | None = None
n_clusters
int
n_init
int | Literal['auto'] = 1
output_path
str
oversampling_factor
float = 2.0
random_state
int = 42
read_kwargs
dict[dict] | None = None
tol
float = 0.0001
verbose
bool = False
write_kwargs
dict[dict] | None = None
nemo_curator.stages.deduplication.semantic.kmeans.KMeansStage.__post_init__()

Initialize parent class after dataclass initialization.

nemo_curator.stages.deduplication.semantic.kmeans.KMeansStage.decompose() -> list[nemo_curator.stages.base.ProcessingStage]
nemo_curator.stages.deduplication.semantic.kmeans.COSINE_DIST_TO_CENT_COL = 'cosine_dist_to_cent'
nemo_curator.stages.deduplication.semantic.kmeans.L2_DIST_TO_CENT_COL = 'l2_dist_to_cent'