*** title: Pipeline description: >- API reference for the Pipeline class - the main orchestrator for executing sequences of processing stages ------------------------------ The `Pipeline` class is the main orchestrator for executing sequences of processing stages in NeMo Curator. ## Import ```python from nemo_curator.pipeline import Pipeline ``` ## Class Definition ```python class Pipeline: """User-facing pipeline definition for composing processing stages.""" def __init__( self, name: str, description: str | None = None, stages: list[ProcessingStage] | None = None, config: dict[str, Any] | None = None, ) -> None: """Initialize a pipeline. Args: name: Name identifier for the pipeline. description: Optional description of the pipeline's purpose. stages: Optional list of processing stages to include. config: Optional pipeline configuration valid across all executors. """ ``` ## Methods ### `add_stage()` Add a stage to the pipeline. ```python def add_stage(self, stage: ProcessingStage) -> "Pipeline": """Add a processing stage to the pipeline. Args: stage: The ProcessingStage to add. Returns: Self for method chaining. """ ``` ### `build()` Build an execution plan from the pipeline. Decomposes composite stages into execution stages and updates the pipeline in place. ```python def build(self) -> None: """Build the execution plan by decomposing composite stages. Raises: ValueError: If the pipeline has no stages. """ ``` ### `run()` Execute the pipeline. Calls `build()` before execution. ```python def run( self, executor: BaseExecutor | None = None, initial_tasks: list[Task] | None = None, ) -> list[Task] | None: """Execute the pipeline. Args: executor: Executor to use. Defaults to XennaExecutor. initial_tasks: Initial tasks to start the pipeline. Returns: List of output tasks from the final stage, or None. """ ``` ### `describe()` Get a detailed description of the pipeline. ```python def describe(self) -> str: """Get detailed description of pipeline stages and requirements. Returns: Human-readable description of the pipeline. """ ``` ## Usage Examples ### Basic Pipeline ```python from nemo_curator.pipeline import Pipeline from nemo_curator.stages.text.io import ParquetReader, ParquetWriter from nemo_curator.stages.text.filters import LengthFilter # Create pipeline with stages pipeline = Pipeline( name="text_curation", description="Basic text curation pipeline", stages=[ ParquetReader(input_path="/data/input"), LengthFilter(min_length=100, max_length=10000), ParquetWriter(output_path="/data/output"), ], ) # Run the pipeline results = pipeline.run() ``` ### Method Chaining ```python pipeline = Pipeline(name="my_pipeline") pipeline.add_stage(stage1).add_stage(stage2).add_stage(stage3) results = pipeline.run() ``` ### Custom Executor ```python from nemo_curator.backends.xenna import XennaExecutor executor = XennaExecutor(config={"execution_mode": "streaming"}) results = pipeline.run(executor=executor) ``` ### Pipeline Configuration ```python pipeline = Pipeline( name="my_pipeline", config={"custom_key": "value"}, ) ``` ## Source Code [View source on GitHub](https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/pipeline/pipeline.py)