API ReferenceExecutors

Experimental Executors

View as Markdown

NeMo Curator provides experimental executors for alternative execution backends. These are located in nemo_curator.backends.experimental.

Experimental executors are subject to change and may not have full feature parity with XennaExecutor.

RayDataExecutor

Uses Ray Data for distributed execution.

Import

1from nemo_curator.backends.experimental import RayDataExecutor

Usage

1executor = RayDataExecutor(
2 config={
3 "ignore_failures": False,
4 },
5 ignore_head_node=True, # Exclude head node from execution
6)
7
8results = pipeline.run(executor=executor)

Configuration

OptionTypeDefaultDescription
ignore_failuresboolFalseContinue on task failures
ignore_head_nodeboolFalseExclude head node from execution

RayActorPoolExecutor

Uses Ray Actor Pool for distributed execution.

Import

1from nemo_curator.backends.experimental import RayActorPoolExecutor

Usage

1executor = RayActorPoolExecutor(
2 config={
3 "pool_size": 8,
4 },
5 ignore_head_node=True,
6)
7
8results = pipeline.run(executor=executor)

Configuration

OptionTypeDefaultDescription
pool_sizeintAutoNumber of actors in the pool
ignore_head_nodeboolFalseExclude head node from execution

BaseExecutor Interface

All executors inherit from BaseExecutor:

1from abc import ABC, abstractmethod
2from typing import Any
3
4class BaseExecutor(ABC):
5 """Base class for all executors."""
6
7 def __init__(
8 self,
9 config: dict[str, Any] | None = None,
10 ignore_head_node: bool = False,
11 ) -> None:
12 """Initialize executor.
13
14 Args:
15 config: Executor-specific configuration.
16 ignore_head_node: Exclude head node from execution.
17 """
18 self.config = config or {}
19 self.ignore_head_node = ignore_head_node
20
21 @abstractmethod
22 def execute(
23 self,
24 stages: list[ProcessingStage],
25 initial_tasks: list[Task] | None = None,
26 ) -> list[Task]:
27 """Execute pipeline stages.
28
29 Args:
30 stages: Processing stages to execute.
31 initial_tasks: Initial tasks (defaults to EmptyTask).
32
33 Returns:
34 Output tasks from final stage.
35 """

Creating Custom Executors

1from nemo_curator.backends.base import BaseExecutor
2from nemo_curator.stages.base import ProcessingStage
3from nemo_curator.tasks import Task
4
5class MyCustomExecutor(BaseExecutor):
6 """Custom executor implementation."""
7
8 def execute(
9 self,
10 stages: list[ProcessingStage],
11 initial_tasks: list[Task] | None = None,
12 ) -> list[Task]:
13 tasks = initial_tasks or [EmptyTask()]
14
15 for stage in stages:
16 stage.setup({})
17 new_tasks = []
18 for task in tasks:
19 result = stage.process(task)
20 if result is not None:
21 if isinstance(result, list):
22 new_tasks.extend(result)
23 else:
24 new_tasks.append(result)
25 stage.teardown()
26 tasks = new_tasks
27
28 return tasks

Choosing an Executor

ExecutorBest ForConsiderations
XennaExecutorProduction workloadsDefault choice, most stable
RayDataExecutorRay-native environmentsExperimental
RayActorPoolExecutorFine-grained actor controlExperimental

Source Code

View source on GitHub