backends.experimental.ray_data.adapter#

Module Contents#

Classes#

RayDataStageAdapter

Adapts ProcessingStage to Ray Data operations.

Functions#

create_actor_from_stage

Create a StageProcessor class with the proper stage name for display.

create_task_from_stage

Create a named Ray Data stage adapter function.

API#

class backends.experimental.ray_data.adapter.RayDataStageAdapter(stage: nemo_curator.stages.base.ProcessingStage)#

Bases: nemo_curator.backends.base.BaseStageAdapter

Adapts ProcessingStage to Ray Data operations.

This adapter converts stages to work with Ray Data datasets by:

  1. Working directly with Task objects (no dictionary conversion)

  2. Using Ray Data’s map_batches for parallel processing a. If stage has both gpus and cpus specified, then we use actors b. If stage.setup is overridden, then we use actors c. Else we use tasks

Initialization

property batch_size: int | None#

Get the batch size for this stage.

process_dataset(dataset: ray.data.Dataset) ray.data.Dataset#

Process a Ray Data dataset through this stage.

Args: dataset (Dataset): Ray Data dataset containing Task objects

Returns: Dataset: Processed Ray Data dataset

backends.experimental.ray_data.adapter.create_actor_from_stage(
stage: nemo_curator.stages.base.ProcessingStage,
) type[backends.experimental.ray_data.adapter.RayDataStageAdapter]#

Create a StageProcessor class with the proper stage name for display.

backends.experimental.ray_data.adapter.create_task_from_stage(
stage: nemo_curator.stages.base.ProcessingStage,
) collections.abc.Callable[[dict[str, Any]], dict[str, Any]]#

Create a named Ray Data stage adapter function.

This creates a standalone function that wraps the stage processing logic with a clean name that doesn’t include the class qualification.

Args: stage (ProcessingStage): Processing stage to adapt

Returns: Callable: A function that can be used directly with Ray Data’s map_batches