API Reference

Pipeline

View as Markdown

The Pipeline class is the main orchestrator for executing sequences of processing stages in NeMo Curator.

Import

1from nemo_curator.pipeline import Pipeline

Class Definition

1class Pipeline:
2 """User-facing pipeline definition for composing processing stages."""
3
4 def __init__(
5 self,
6 name: str,
7 description: str | None = None,
8 stages: list[ProcessingStage] | None = None,
9 config: dict[str, Any] | None = None,
10 ) -> None:
11 """Initialize a pipeline.
12
13 Args:
14 name: Name identifier for the pipeline.
15 description: Optional description of the pipeline's purpose.
16 stages: Optional list of processing stages to include.
17 config: Optional pipeline configuration valid across all executors.
18 """

Methods

add_stage()

Add a stage to the pipeline.

1def add_stage(self, stage: ProcessingStage) -> "Pipeline":
2 """Add a processing stage to the pipeline.
3
4 Args:
5 stage: The ProcessingStage to add.
6
7 Returns:
8 Self for method chaining.
9 """

build()

Build an execution plan from the pipeline. Decomposes composite stages into execution stages and updates the pipeline in place.

1def build(self) -> None:
2 """Build the execution plan by decomposing composite stages.
3
4 Raises:
5 ValueError: If the pipeline has no stages.
6 """

run()

Execute the pipeline. Calls build() before execution.

1def run(
2 self,
3 executor: BaseExecutor | None = None,
4 initial_tasks: list[Task] | None = None,
5) -> list[Task] | None:
6 """Execute the pipeline.
7
8 Args:
9 executor: Executor to use. Defaults to XennaExecutor.
10 initial_tasks: Initial tasks to start the pipeline.
11
12 Returns:
13 List of output tasks from the final stage, or None.
14 """

describe()

Get a detailed description of the pipeline.

1def describe(self) -> str:
2 """Get detailed description of pipeline stages and requirements.
3
4 Returns:
5 Human-readable description of the pipeline.
6 """

Usage Examples

Basic Pipeline

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.text.io import ParquetReader, ParquetWriter
3from nemo_curator.stages.text.filters import LengthFilter
4
5# Create pipeline with stages
6pipeline = Pipeline(
7 name="text_curation",
8 description="Basic text curation pipeline",
9 stages=[
10 ParquetReader(input_path="/data/input"),
11 LengthFilter(min_length=100, max_length=10000),
12 ParquetWriter(output_path="/data/output"),
13 ],
14)
15
16# Run the pipeline
17results = pipeline.run()

Method Chaining

1pipeline = Pipeline(name="my_pipeline")
2pipeline.add_stage(stage1).add_stage(stage2).add_stage(stage3)
3results = pipeline.run()

Custom Executor

1from nemo_curator.backends.xenna import XennaExecutor
2
3executor = XennaExecutor(config={"execution_mode": "streaming"})
4results = pipeline.run(executor=executor)

Pipeline Configuration

1pipeline = Pipeline(
2 name="my_pipeline",
3 config={"custom_key": "value"},
4)

Source Code

View source on GitHub