> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

# nemo_curator.stages.deduplication.semantic.workflow

End-to-End Semantic Deduplication Pipeline for Ray Curator.

This module contains the complete semantic deduplication workflow:

1. K-means clustering on embedding data (always uses RayActorPoolExecutor)
2. Pairwise similarity computation within clusters + duplicate identification (configurable executor)

## Module Contents

### Classes

| Name                                                                                                                  | Description                                 |
| --------------------------------------------------------------------------------------------------------------------- | ------------------------------------------- |
| [`SemanticDeduplicationWorkflow`](#nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow) | End-to-End Semantic Deduplication Workflow. |

### Data

[`MIN_RECOMMENDED_N_CLUSTERS`](#nemo_curator-stages-deduplication-semantic-workflow-MIN_RECOMMENDED_N_CLUSTERS)

### API

<Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow">
  <CodeBlock links={{"nemo_curator.stages.deduplication.semantic.ranking.RankingStrategy":"/nemo-curator/nemo_curator/stages/deduplication/semantic/ranking#nemo_curator-stages-deduplication-semantic-ranking-RankingStrategy"}} showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow(
        input_path: str | list[str],
        output_path: str,
        n_clusters: int,
        cache_path: str | None = None,
        id_field: str = 'id',
        embedding_field: str = 'embeddings',
        embedding_dim: int | None = None,
        metadata_fields: list[str] | None = None,
        input_filetype: typing.Literal['parquet', 'jsonl'] = 'parquet',
        input_file_extensions: list[str] | None = None,
        max_iter: int = 300,
        tol: float = 0.0001,
        random_state: int = 42,
        init: typing.Literal['k-means||', 'random'] | numpy.ndarray = 'k-means||',
        n_init: int | typing.Literal['auto'] = 1,
        oversampling_factor: float = 2.0,
        max_samples_per_batch: int = 1 << 15,
        distance_metric: typing.Literal['cosine', 'l2'] = 'cosine',
        which_to_keep: typing.Literal['hard', 'easy', 'random'] = 'hard',
        ranking_strategy: nemo_curator.stages.deduplication.semantic.ranking.RankingStrategy | None = None,
        pairwise_batch_size: int = 1024,
        eps: float | None = None,
        _duplicates_num_row_groups_hint: int | None = None,
        read_kwargs: dict[str, typing.Any] | None = None,
        cache_kwargs: dict[str, typing.Any] | None = None,
        write_kwargs: dict[str, typing.Any] | None = None,
        clear_output: bool = True,
        verbose: bool = True
    )
    ```
  </CodeBlock>
</Anchor>

<Indent>
  **Bases:** [WorkflowBase](/nemo-curator/nemo_curator/pipeline/workflow#nemo_curator-pipeline-workflow-WorkflowBase)

  End-to-End Semantic Deduplication Workflow.
  It consists of the following stages:

  * KMeansStage
    Takes the input path (embeddings) and clusters the embeddings into n\_clusters.
    Writes data partitioned by centroid to cache\_path.
  * PairwiseStage
    Computes pairwise similarity between all embeddings in each cluster.
    Takes the output of KMeansStage and computes pairwise similarity between all embeddings in each cluster.
    This is written to cache\_path.
  * IdentifyDuplicatesStage (optional)
    Identifies duplicates based on the pairwise similarity scores.
    Runs only if eps is provided.
    This is written to output\_path.

  <ParamField path="cache_kwargs" />

  <ParamField path="cache_path" type="= cache_path or output_path" />

  <ParamField path="duplicates_output_path" type="= os.path.join(self.output_path, 'duplicates')" />

  <ParamField path="kmeans_output_path" type="= os.path.join(self.cache_path, 'kmeans_results')" />

  <ParamField path="pairwise_output_path" type="= os.path.join(self.cache_path, 'pairwise_results')" />

  <ParamField path="read_kwargs" type="= read_kwargs.copy() if read_kwargs else {}" />

  <ParamField path="write_kwargs" type="= write_kwargs.copy() if write_kwargs else {}" />

  <Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow-_log_configuration">
    <CodeBlock links={{"nemo_curator.backends.base.BaseExecutor":"/nemo-curator/nemo_curator/backends/base#nemo_curator-backends-base-BaseExecutor"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._log_configuration(
          pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None
      ) -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Log workflow configuration.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow-_run_kmeans_stage">
    <CodeBlock links={{"nemo_curator.backends.ray_actor_pool.RayActorPoolExecutor":"/nemo-curator/nemo_curator/backends/ray_actor_pool/executor#nemo_curator-backends-ray_actor_pool-executor-RayActorPoolExecutor"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._run_kmeans_stage(
          kmeans_executor: nemo_curator.backends.ray_actor_pool.RayActorPoolExecutor
      ) -> list[typing.Any]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Run K-means clustering stage (always uses RayActorPoolExecutor).
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow-_run_pairwise_stage">
    <CodeBlock links={{"nemo_curator.backends.base.BaseExecutor":"/nemo-curator/nemo_curator/backends/base#nemo_curator-backends-base-BaseExecutor"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._run_pairwise_stage(
          pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None
      ) -> list[typing.Any]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Run pairwise similarity + duplicate identification stage.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow-_setup_directories">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._setup_directories() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Setup output directories with fsspec compliance.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow-_validate_config">
    <CodeBlock showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow._validate_config() -> None
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Validate the configuration.
  </Indent>

  <Anchor id="nemo_curator-stages-deduplication-semantic-workflow-SemanticDeduplicationWorkflow-run">
    <CodeBlock links={{"nemo_curator.backends.base.BaseExecutor":"/nemo-curator/nemo_curator/backends/base#nemo_curator-backends-base-BaseExecutor","nemo_curator.pipeline.workflow.WorkflowRunResult":"/nemo-curator/nemo_curator/pipeline/workflow#nemo_curator-pipeline-workflow-WorkflowRunResult"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.stages.deduplication.semantic.workflow.SemanticDeduplicationWorkflow.run(
          kmeans_executor: nemo_curator.backends.base.BaseExecutor | None = None,
          pairwise_executor: nemo_curator.backends.base.BaseExecutor | None = None
      ) -> nemo_curator.pipeline.workflow.WorkflowRunResult
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    Run the complete semantic deduplication pipeline.

    **Parameters:**

    <ParamField path="kmeans_executor" type="BaseExecutor | None" default="None">
      Executor for kmeans stage. Defaults to RayActorPoolExecutor().
    </ParamField>

    <ParamField path="pairwise_executor" type="BaseExecutor | None" default="None">
      Executor for pairwise stage. Defaults to XennaExecutor().
    </ParamField>

    **Returns:** `WorkflowRunResult`

    WorkflowRunResult object containing the results and timing information
  </Indent>
</Indent>

<Anchor id="nemo_curator-stages-deduplication-semantic-workflow-MIN_RECOMMENDED_N_CLUSTERS">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    nemo_curator.stages.deduplication.semantic.workflow.MIN_RECOMMENDED_N_CLUSTERS = 1000
    ```
  </CodeBlock>
</Anchor>