*** title: ProcessingStage description: >- API reference for the ProcessingStage base class - the foundation for all data processing in NeMo Curator -------------------------- The `ProcessingStage` class is the base class for all data processing stages in NeMo Curator. Each stage defines a single step in a data curation pipeline. ## Import ```python from nemo_curator.stages.base import ProcessingStage ``` ## Class Definition ```python from dataclasses import dataclass from typing import Generic, TypeVar InputT = TypeVar("InputT", bound=Task) OutputT = TypeVar("OutputT", bound=Task) @dataclass class ProcessingStage(Generic[InputT, OutputT]): """Base class for all processing stages. Type Parameters: InputT: The input task type this stage accepts. OutputT: The output task type this stage produces. Class Attributes: name: String identifier for the stage. resources: Resources configuration (CPUs, GPUs). batch_size: Number of tasks to process per batch. """ name: str = "ProcessingStage" resources: Resources = field(default_factory=lambda: Resources(cpus=1.0)) batch_size: int = 1 ``` ## Abstract Methods ### `inputs()` Define stage input requirements. ```python def inputs(self) -> tuple[list[str], list[str]]: """Define required task and data attributes. Returns: Tuple of (required_task_attributes, required_data_attributes). """ ``` ### `outputs()` Define stage output requirements. ```python def outputs(self) -> tuple[list[str], list[str]]: """Define output task and data attributes. Returns: Tuple of (output_task_attributes, output_data_attributes). """ ``` ### `process()` Process a single task. ```python def process(self, task: InputT) -> OutputT | list[OutputT] | None: """Process a single task. Args: task: The input task to process. Returns: - Single task: For 1-to-1 transformations - List of tasks: For splitting/reading operations - None: To filter out the task """ ``` ## Optional Lifecycle Methods ### `setup_on_node()` Node-level initialization (e.g., download models). ```python def setup_on_node( self, node_info: NodeInfo, worker_metadata: dict[str, Any], ) -> None: """Initialize resources on a compute node. Called once per node before any workers start. """ ``` ### `setup()` Worker-level initialization (e.g., load models). ```python def setup(self, worker_metadata: dict[str, Any]) -> None: """Initialize resources for a worker. Called once per worker before processing begins. """ ``` ### `teardown()` Cleanup after processing. ```python def teardown(self) -> None: """Clean up resources after processing completes.""" ``` ### `process_batch()` Vectorized batch processing for better performance. ```python def process_batch(self, tasks: list[InputT]) -> list[OutputT | None]: """Process a batch of tasks. Override for vectorized operations. Args: tasks: List of input tasks. Returns: List of output tasks (None entries are filtered out). """ ``` ## Creating Custom Stages ```python from dataclasses import dataclass from nemo_curator.stages.base import ProcessingStage from nemo_curator.stages.resources import Resources from nemo_curator.tasks import DocumentBatch @dataclass class MyCustomStage(ProcessingStage[DocumentBatch, DocumentBatch]): """Custom stage that processes documents.""" name: str = "MyCustomStage" resources: Resources = field(default_factory=lambda: Resources(cpus=2.0)) # Custom parameters threshold: float = 0.5 def inputs(self) -> tuple[list[str], list[str]]: return ["data"], ["text"] def outputs(self) -> tuple[list[str], list[str]]: return ["data"], ["text", "score"] def process(self, task: DocumentBatch) -> DocumentBatch | None: # Process the task df = task.data df["score"] = df["text"].apply(self._compute_score) # Filter based on threshold if df["score"].mean() < self.threshold: return None return DocumentBatch( task_id=f"{task.task_id}_{self.name}", dataset_name=task.dataset_name, data=df, _metadata=task._metadata, _stage_perf=task._stage_perf, ) def _compute_score(self, text: str) -> float: # Custom scoring logic return len(text) / 1000.0 ``` ## Configuration with `with_()` Stages can be configured using the `with_()` method: ```python from nemo_curator.stages.resources import Resources stage = MyCustomStage(threshold=0.7) configured_stage = stage.with_(resources=Resources(cpus=4.0, gpus=1.0)) ``` ## Source Code [View source on GitHub](https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/stages/base.py)