***

title: Pipeline
description: >-
API reference for the Pipeline class - the main orchestrator for executing
sequences of processing stages
------------------------------

The `Pipeline` class is the main orchestrator for executing sequences of processing stages in NeMo Curator.

## Import

```python
from nemo_curator.pipeline import Pipeline
```

## Class Definition

```python
class Pipeline:
    """User-facing pipeline definition for composing processing stages."""

    def __init__(
        self,
        name: str,
        description: str | None = None,
        stages: list[ProcessingStage] | None = None,
        config: dict[str, Any] | None = None,
    ) -> None:
        """Initialize a pipeline.

        Args:
            name: Name identifier for the pipeline.
            description: Optional description of the pipeline's purpose.
            stages: Optional list of processing stages to include.
            config: Optional pipeline configuration valid across all executors.
        """
```

## Methods

### `add_stage()`

Add a stage to the pipeline.

```python
def add_stage(self, stage: ProcessingStage) -> "Pipeline":
    """Add a processing stage to the pipeline.

    Args:
        stage: The ProcessingStage to add.

    Returns:
        Self for method chaining.
    """
```

### `build()`

Build an execution plan from the pipeline. Decomposes composite stages into execution stages and updates the pipeline in place.

```python
def build(self) -> None:
    """Build the execution plan by decomposing composite stages.

    Raises:
        ValueError: If the pipeline has no stages.
    """
```

### `run()`

Execute the pipeline. Calls `build()` before execution.

```python
def run(
    self,
    executor: BaseExecutor | None = None,
    initial_tasks: list[Task] | None = None,
) -> list[Task] | None:
    """Execute the pipeline.

    Args:
        executor: Executor to use. Defaults to XennaExecutor.
        initial_tasks: Initial tasks to start the pipeline.

    Returns:
        List of output tasks from the final stage, or None.
    """
```

### `describe()`

Get a detailed description of the pipeline.

```python
def describe(self) -> str:
    """Get detailed description of pipeline stages and requirements.

    Returns:
        Human-readable description of the pipeline.
    """
```

## Usage Examples

### Basic Pipeline

```python
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io import ParquetReader, ParquetWriter
from nemo_curator.stages.text.filters import LengthFilter

# Create pipeline with stages
pipeline = Pipeline(
    name="text_curation",
    description="Basic text curation pipeline",
    stages=[
        ParquetReader(input_path="/data/input"),
        LengthFilter(min_length=100, max_length=10000),
        ParquetWriter(output_path="/data/output"),
    ],
)

# Run the pipeline
results = pipeline.run()
```

### Method Chaining

```python
pipeline = Pipeline(name="my_pipeline")
pipeline.add_stage(stage1).add_stage(stage2).add_stage(stage3)
results = pipeline.run()
```

### Custom Executor

```python
from nemo_curator.backends.xenna import XennaExecutor

executor = XennaExecutor(config={"execution_mode": "streaming"})
results = pipeline.run(executor=executor)
```

### Pipeline Configuration

```python
pipeline = Pipeline(
    name="my_pipeline",
    config={"custom_key": "value"},
)
```

## Source Code

[View source on GitHub](https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/pipeline/pipeline.py)
