stages.base
#
Module Contents#
Classes#
Base class for high-level composite stages. |
|
Base class for all processing stages. Processing stages operate on Task objects (or subclasses like DocumentBatch). Each stage type can declare what type of Task it processes as input (X) and what type it produces as output (Y). Stages can return either: |
|
Metaclass that automatically registers concrete Stage subclasses. A class is considered concrete if it directly inherits from |
Functions#
Retrieve a registered stage class by its class name. Raises |
Data#
API#
- class stages.base.CompositeStage#
Bases:
stages.base.ProcessingStage
[stages.base.X
,stages.base.Y
],abc.ABC
Base class for high-level composite stages.
Composite stages are user-facing stages that decompose into multiple low-level execution stages during pipeline planning. They provide a simplified API while maintaining fine-grained control at execution time.
Composite stages never actually execute - they only exist to be decomposed into their constituent execution stages.
Initialization
- abstractmethod decompose() list[stages.base.ProcessingStage] #
Decompose into execution stages.
This method must be implemented by composite stages to define what low-level stages they represent.
Returns (list[ProcessingStage]): List of execution stages that will actually run
- decompose_and_apply_with() list[stages.base.ProcessingStage] #
Decompose and apply configuration changes to this stage.
- get_description() str #
Get a description of what this composite stage does.
Override this to provide user-friendly documentation.
- inputs() tuple[list[str], list[str]] #
Get the inputs for this stage.
- outputs() tuple[list[str], list[str]] #
Get the outputs for this stage.
- process(task: stages.base.X) stages.base.Y | list[stages.base.Y] #
Composite stages should never be executed directly.
- with_(stage_with_dict: dict[str, Any]) stages.base.CompositeStage #
Apply configuration changes to this stage.
- class stages.base.ProcessingStage#
Bases:
abc.ABC
,typing.Generic
[stages.base.X
,stages.base.Y
]Base class for all processing stages. Processing stages operate on Task objects (or subclasses like DocumentBatch). Each stage type can declare what type of Task it processes as input (X) and what type it produces as output (Y). Stages can return either:
A single task (typical for transformations)
A list of tasks (for stages that split work, like readers)
None (for filtered out tasks)
- property batch_size: int | None#
Number of tasks to process in a batch.
- get_config() dict[str, Any] #
Get configuration for this stage. Returns (dict[str, Any]): Dictionary containing configuration for this stage
- inputs() tuple[list[str], list[str]] #
Define stage input requirements.
Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present
- property name: str#
- num_workers() int | None #
Number of workers required. If None, then executor will determine the number of workers.
- outputs() tuple[list[str], list[str]] #
Define stage output specification.
Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies
- abstractmethod process(task: stages.base.X) stages.base.Y | list[stages.base.Y] #
Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out
- process_batch(tasks: list[stages.base.X]) list[stages.base.Y] #
Process a batch of tasks and return results. Override this method to enable batch processing for your stage. If not overridden, the stage will only support single-task processing. Args: tasks (list[X]): List of input tasks to process Returns (list[Y]): List of results, where each result can be: - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations - None: If the task should be filtered out Note: The returned list should have the same length as the input list, with each element corresponding to the result of processing the task at the same index.
- ray_stage_spec() dict[str, Any] #
Get Ray configuration for this stage. Note : This is only used for Ray Data which is an experimental backend. The keys are defined in RayStageSpecKeys in backends/experimental/ray_data/utils.py
Returns (dict[str, Any]): Dictionary containing Ray-specific configuration
- property resources: nemo_curator.stages.resources.Resources#
- setup(
- worker_metadata: nemo_curator.backends.base.WorkerMetadata | None = None,
Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)
- setup_on_node(
- node_info: nemo_curator.backends.base.NodeInfo | None = None,
- worker_metadata: nemo_curator.backends.base.WorkerMetadata | None = None,
Setup method called once per node in distributed settings. Override this method to perform node-level initialization. Args: node_info (NodeInfo, optional): Information about the node (provided by some backends) worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)
- supports_batch_processing() bool #
Whether this stage supports vectorized batch processing. This is automatically determined by checking if the stage has overridden the process_batch method from the base class.
- teardown() None #
Teardown method called once after processing ends. Override this method to perform any cleanup.
- validate_input(task: nemo_curator.tasks.Task) bool #
Validate input task meets requirements. Args: task: Task to validate Returns: True if valid, False otherwise
- with_(
- name: str | None = None,
- resources: nemo_curator.stages.resources.Resources | None = None,
- batch_size: int | None = None,
Apply configuration changes to this stage with overridden properties.
Note: This method uses class-level attributes and instance attributes interchangeably which can sometimes lead to unexpected behavior. Please see https://github.com/NVIDIA-NeMo/Curator/pull/764 for more details. Args: name: Override the name property resources: Override the resources property batch_size: Override the batch_size property
- xenna_stage_spec() dict[str, Any] #
Get Xenna configuration for this stage.
Returns (dict[str, Any]): Dictionary containing Xenna-specific configuration
- class stages.base.StageMeta#
Bases:
abc.ABCMeta
Metaclass that automatically registers concrete Stage subclasses. A class is considered concrete if it directly inherits from
- Class:
ProcessingStage
and implements aname
property. Abstract helper classes (e.g. ProcessingStage itself) will not be added to the registry because they have the_is_abstract
attribute set.
- stages.base.X#
‘TypeVar(…)’
- stages.base.Y#
‘TypeVar(…)’
- stages.base.get_stage_class(name: str) type[ProcessingStage] #
Retrieve a registered stage class by its class name. Raises
KeyError If no stage with that name is registered.