stages.deduplication.semantic.kmeans
#
Module Contents#
Classes#
KMeans clustering stage that requires RAFT for distributed processing. |
|
KMeans clustering stage that requires RAFT for distributed processing. |
Data#
API#
- stages.deduplication.semantic.kmeans.COSINE_DIST_TO_CENT_COL#
‘cosine_dist_to_cent’
- class stages.deduplication.semantic.kmeans.KMeansReadFitWriteStage(id_field: str, embedding_field: str, output_path: str, filetype: typing.Literal[parquet, jsonl], n_clusters: int, metadata_fields: list[str] | None = None, embedding_dim: int | None = None, verbose: bool = False, max_iter: int = 300, tol: float = 0.0001, random_state: int = 42, init: typing.Literal[k-means||, random] | numpy.ndarray = 'k-means||', n_init: int | typing.Literal[auto] = 1, oversampling_factor: float = 2.0, max_samples_per_batch: int = 1 << 15, read_kwargs: dict[dict] | None = None, write_kwargs: dict[dict] | None = None)#
Bases:
nemo_curator.stages.base.ProcessingStage
[nemo_curator.tasks.FileGroupTask
,nemo_curator.tasks._EmptyTask
],nemo_curator.stages.deduplication.io_utils.DeduplicationIO
KMeans clustering stage that requires RAFT for distributed processing.
Initialization
KMeans clustering stage that requires RAFT for distributed processing.
Args: id_field (str): The column name of the id column. embedding_field (str): The column name of the embedding column. output_path (str): The path to the output directory. n_clusters (int): The number of clusters to create. metadata_fields (list[str] | None): The columns to keep in the output. These columns can be used later to prioritize deduplication. embedding_dim (int | None): The dimension of the embedding. This helps us read data into smaller chunks. verbose (bool): Whether to print verbose output. max_iter (int): The maximum number of iterations to run. tol (float): Tolerance for stopping criteria of the kmeans algorithm. random_state (int): Seed for the random number generator. Unseeded by default. Does not currently fully guarantee the exact same results. init (Literal[“k-means||”, “random”] | np.ndarray): ‘scalable-k-means++’ or ‘k-means||’: Uses fast and stable scalable kmeans++ initialization. ‘random’: Choose ‘n_cluster’ observations (rows) at random from data for the initial centroids. If an ndarray is passed, it should be of shape (n_clusters, n_features) and gives the initial centers. n_init (int | Literal[“auto”]): Number of times the k-means algorithm will be run with different centroid seeds. The final results will be the best output of n_init consecutive runs in terms of inertia. oversampling_factor (float): The amount of points to sample in scalable k-means++ initialization for potential centroids. Increasing this value can lead to better initial centroids at the cost of memory. The total number of centroids sampled in scalable k-means++ is oversampling_factor * n_clusters * 8. max_samples_per_batch (int): The number of data samples to use for batches of the pairwise distance computation. This computation is done throughout both fit predict. The default should suit most cases. The total number of elements in the batched pairwise distance computation is max_samples_per_batch * n_clusters. It might become necessary to lower this number when n_clusters becomes prohibitively large. read_kwargs (dict[dict]): Keyword arguments for the read stage. write_kwargs (dict[dict]): Keyword arguments for the write stage.
- static normalize_embeddings_col_in_df(
- df: cudf.DataFrame,
- embedding_col: str,
- process(
- task: nemo_curator.tasks.FileGroupTask,
Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out
- process_batch(
- tasks: list[nemo_curator.tasks.FileGroupTask],
Process a batch of FileGroupTasks using distributed RAFT KMeans.
In RAFT mode, each actor processes its assigned tasks, but the KMeans model is trained cooperatively across all actors using RAFT communication.
This method:
Reads data from this actor’s assigned tasks
Breaks data into subgroups to avoid cudf row limits
Fits distributed KMeans model (coordinates with other actors via RAFT)
Assigns cluster centroids back to each subgroup
Writes the results for each subgroup
- ray_stage_spec() dict[str, Any] #
Get Ray configuration for this stage. Note : This is only used for Ray Data which is an experimental backend. The keys are defined in RayStageSpecKeys in backends/experimental/ray_data/utils.py
Returns (dict[str, Any]): Dictionary containing Ray-specific configuration
- setup(
- _: nemo_curator.backends.base.WorkerMetadata | None = None,
Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)
- class stages.deduplication.semantic.kmeans.KMeansStage#
Bases:
nemo_curator.stages.base.CompositeStage
[nemo_curator.tasks._EmptyTask
,nemo_curator.tasks._EmptyTask
]KMeans clustering stage that requires RAFT for distributed processing.
Initialization
- decompose() list[nemo_curator.stages.base.ProcessingStage] #
Decompose into execution stages.
This method must be implemented by composite stages to define what low-level stages they represent.
Returns (list[ProcessingStage]): List of execution stages that will actually run
- embedding_dim: int | None#
None
- embedding_field: str#
None
- id_field: str#
None
- init: Literal[k-means||, random] | numpy.ndarray#
‘k-means||’
- input_file_extensions: list[str] | None#
None
- input_filetype: Literal[jsonl, parquet]#
‘parquet’
- input_path: str | list[str]#
None
- max_iter: int#
300
- max_samples_per_batch: int#
None
KMeans clustering stage that requires RAFT for distributed processing.
Args: n_clusters (int): The number of clusters to create. id_field (str): The column name of the id column. embedding_field (str): The column name of the embedding column. input_path (str | list[str]): The path to the input directory. output_path (str): The path to the output directory. metadata_fields (list[str] | None): The columns to keep in the output. These columns can be used later to prioritize deduplication. verbose (bool): Whether to print verbose output. embedding_dim (int | None): The dimension of the embedding. This helps us read data into smaller chunks. input_filetype (Literal[“jsonl”, “parquet”]): The type of the input file read_kwargs (dict[dict]): Keyword arguments for the read stage. write_kwargs (dict[dict]): Keyword arguments for the write stage. max_iter (int): The maximum number of iterations to run. tol (float): Tolerance for stopping criteria of the kmeans algorithm. random_state (int): Seed for the random number generator. Unseeded by default. Does not currently fully guarantee the exact same results. init (Literal[“k-means||”, “random”] | np.ndarray): ‘scalable-k-means++’ or ‘k-means||’: Uses fast and stable scalable kmeans++ initialization. ‘random’: Choose ‘n_cluster’ observations (rows) at random from data for the initial centroids. If an ndarray is passed, it should be of shape (n_clusters, n_features) and gives the initial centers. n_init (int | Literal[“auto”]): Number of times the k-means algorithm will be run with different centroid seeds. The final results will be the best output of n_init consecutive runs in terms of inertia. oversampling_factor (float): The amount of points to sample in scalable k-means++ initialization for potential centroids. Increasing this value can lead to better initial centroids at the cost of memory. The total number of centroids sampled in scalable k-means++ is oversampling_factor * n_clusters * 8. max_samples_per_batch (int): The number of data samples to use for batches of the pairwise distance computation. This computation is done throughout both fit predict. The default should suit most cases. The total number of elements in the batched pairwise distance computation is max_samples_per_batch * n_clusters. It might become necessary to lower this number when n_clusters becomes prohibitively large.
- metadata_fields: list[str] | None#
None
- n_clusters: int#
None
- n_init: int | Literal[auto]#
1
- output_path: str#
None
- oversampling_factor: float#
2.0
- random_state: int#
42
- read_kwargs: dict[dict] | None#
None
- tol: float#
0.0001
- verbose: bool#
False
- write_kwargs: dict[dict] | None#
None
- stages.deduplication.semantic.kmeans.L2_DIST_TO_CENT_COL#
‘l2_dist_to_cent’