API Reference

CompositeStage

View as Markdown

The CompositeStage class represents high-level, user-facing stages that decompose into multiple low-level execution stages.

Import

1from nemo_curator.stages.base import CompositeStage

When to Use CompositeStage

Use CompositeStage when you need to:

  • Provide a simplified API while maintaining fine-grained execution control
  • Bundle multiple related stages into a single logical operation
  • Handle stages that require different resources (e.g., CPU-based followed by GPU-based)

Class Definition

1from dataclasses import dataclass
2from typing import Generic, TypeVar
3
4@dataclass
5class CompositeStage(ProcessingStage[InputT, OutputT]):
6 """High-level stage that decomposes into multiple execution stages.
7
8 Composite stages are decomposed during pipeline planning, allowing
9 each sub-stage to run with its own resource requirements.
10
11 Attributes:
12 stages: List of constituent ProcessingStage instances.
13 """
14
15 stages: list[ProcessingStage] = field(default_factory=list)

Abstract Methods

decompose()

Return the list of stages this composite decomposes into.

1def decompose(self) -> list[ProcessingStage]:
2 """Decompose into constituent execution stages.
3
4 Returns:
5 List of ProcessingStage instances to execute.
6 """
7 return self.stages

Creating a CompositeStage

1from dataclasses import dataclass, field
2from nemo_curator.stages.base import CompositeStage, ProcessingStage
3from nemo_curator.tasks import Task
4
5@dataclass
6class MyCompositeStage(CompositeStage[Task, Task]):
7 """A composite stage that bundles multiple operations."""
8
9 name: str = "MyCompositeStage"
10 param1: str = ""
11 param2: int = 0
12
13 def __post_init__(self) -> None:
14 super().__init__()
15 self.stages = [
16 StageA(param1=self.param1),
17 StageB(param2=self.param2),
18 StageC(),
19 ]
20
21 def inputs(self) -> tuple[list[str], list[str]]:
22 # Return first stage's inputs
23 return self.stages[0].inputs()
24
25 def outputs(self) -> tuple[list[str], list[str]]:
26 # Return last stage's outputs
27 return self.stages[-1].outputs()
28
29 def decompose(self) -> list[ProcessingStage]:
30 return self.stages

Configuration with with_()

CompositeStage uses a dictionary-based with_() signature to configure individual sub-stages:

1from nemo_curator.stages.resources import Resources
2
3composite_stage = MyCompositeStage(param1="value", param2=10)
4
5# Configure individual stages within the composite
6stage_config = {
7 "StageA": {"resources": Resources(cpus=4.0)},
8 "StageB": {"resources": Resources(cpus=2.0, gpus=1.0)},
9}
10configured_stage = composite_stage.with_(stage_config)

Important Rules

  1. Decomposed stages cannot be CompositeStages - Only leaf ProcessingStage instances
  2. inputs() returns first stage’s inputs - The composite’s input requirements
  3. outputs() returns last stage’s outputs - The composite’s output type
  4. Unique stage names - All stages in decompose() must have unique names for with_() to work

Usage in Pipelines

1from nemo_curator.pipeline import Pipeline
2
3# Composite stages are automatically decomposed during build
4pipeline = Pipeline(
5 name="my_pipeline",
6 stages=[
7 MyCompositeStage(param1="test", param2=5),
8 AnotherStage(),
9 ],
10)
11
12# The pipeline.build() method decomposes composites
13# decomposition_info tracks the mapping
14results = pipeline.run()

Source Code

View source on GitHub