*** title: Experimental Executors description: API reference for experimental executors including Ray-based backends ---------------------------------------------------------------------------------- NeMo Curator provides experimental executors for alternative execution backends. These are located in `nemo_curator.backends.experimental`. Experimental executors are subject to change and may not have full feature parity with `XennaExecutor`. ## RayDataExecutor Uses Ray Data for distributed execution. ### Import ```python from nemo_curator.backends.experimental import RayDataExecutor ``` ### Usage ```python executor = RayDataExecutor( config={ "ignore_failures": False, }, ignore_head_node=True, # Exclude head node from execution ) results = pipeline.run(executor=executor) ``` ### Configuration | Option | Type | Default | Description | | ------------------ | ------ | ------- | -------------------------------- | | `ignore_failures` | `bool` | `False` | Continue on task failures | | `ignore_head_node` | `bool` | `False` | Exclude head node from execution | ## RayActorPoolExecutor Uses Ray Actor Pool for distributed execution. ### Import ```python from nemo_curator.backends.experimental import RayActorPoolExecutor ``` ### Usage ```python executor = RayActorPoolExecutor( config={ "pool_size": 8, }, ignore_head_node=True, ) results = pipeline.run(executor=executor) ``` ### Configuration | Option | Type | Default | Description | | ------------------ | ------ | ------- | -------------------------------- | | `pool_size` | `int` | Auto | Number of actors in the pool | | `ignore_head_node` | `bool` | `False` | Exclude head node from execution | ## BaseExecutor Interface All executors inherit from `BaseExecutor`: ```python from abc import ABC, abstractmethod from typing import Any class BaseExecutor(ABC): """Base class for all executors.""" def __init__( self, config: dict[str, Any] | None = None, ignore_head_node: bool = False, ) -> None: """Initialize executor. Args: config: Executor-specific configuration. ignore_head_node: Exclude head node from execution. """ self.config = config or {} self.ignore_head_node = ignore_head_node @abstractmethod def execute( self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None, ) -> list[Task]: """Execute pipeline stages. Args: stages: Processing stages to execute. initial_tasks: Initial tasks (defaults to EmptyTask). Returns: Output tasks from final stage. """ ``` ## Creating Custom Executors ```python from nemo_curator.backends.base import BaseExecutor from nemo_curator.stages.base import ProcessingStage from nemo_curator.tasks import Task class MyCustomExecutor(BaseExecutor): """Custom executor implementation.""" def execute( self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None, ) -> list[Task]: tasks = initial_tasks or [EmptyTask()] for stage in stages: stage.setup({}) new_tasks = [] for task in tasks: result = stage.process(task) if result is not None: if isinstance(result, list): new_tasks.extend(result) else: new_tasks.append(result) stage.teardown() tasks = new_tasks return tasks ``` ## Choosing an Executor | Executor | Best For | Considerations | | ---------------------- | -------------------------- | --------------------------- | | `XennaExecutor` | Production workloads | Default choice, most stable | | `RayDataExecutor` | Ray-native environments | Experimental | | `RayActorPoolExecutor` | Fine-grained actor control | Experimental | ## Source Code [View source on GitHub](https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/backends/experimental/)