nemo_curator.backends.experimental.ray_actor_pool.executor

View as Markdown

Module Contents

Classes

NameDescription
RayActorPoolExecutorRay-based executor using ActorPool for better resource management.

Functions

NameDescription
_parse_runtime_env-

Data

_LARGE_INT

API

class nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor(
config: dict | None = None,
ignore_head_node: bool = False,
show_progress: bool = True,
progress_interval: float = 10.0
)

Bases: BaseExecutor

Ray-based executor using ActorPool for better resource management.

This executor:

  1. Creates a pool of actors per stage using Ray’s ActorPool
  2. Uses map_unordered for better load balancing and fault tolerance
  3. Lets Ray handle object ownership and garbage collection automatically
  4. Provides better backpressure management through ActorPool
nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._cleanup_actor_pool(
actor_pool: ray.util.actor_pool.ActorPool
) -> None

Clean up actors in the pool.

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._cleanup_actors(
actors: list[ray.actor.ActorHandle]
) -> None

Clean up a list of actors.

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._create_actor_pool(
stage: nemo_curator.stages.base.ProcessingStage,
num_actors: int
) -> ray.util.actor_pool.ActorPool

Create an ActorPool for a specific stage.

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._create_raft_actor_pool(
stage: nemo_curator.stages.base.ProcessingStage,
num_actors: int,
session_id: bytes
) -> ray.util.actor_pool.ActorPool

Create a RAFT ActorPool for a specific stage.

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._create_rapidsmpf_actors(
stage: nemo_curator.stages.base.ProcessingStage,
num_actors: int,
num_tasks: int
) -> list[ray.actor.ActorHandle]

Create a RapidsMPFShuffling Actors and setup UCXX communication for a specific stage.

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._execute_lsh_stage(
stage: nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage,
input_tasks: list[nemo_curator.tasks.Task]
) -> list[nemo_curator.tasks.Task]

Execute an LSH stage with band iteration.

Parameters:

stage
LSHStage

The LSH stage to execute

input_tasks
list[Task]

Input tasks to process

Returns: list[Task]

List of output tasks from all band iterations

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._generate_task_batches(
tasks: list[nemo_curator.tasks.Task],
batch_size: int | None = None,
num_output_tasks: int | None = None
) -> list[list[nemo_curator.tasks.Task]]

Generate task batches from a list of tasks. Args: tasks: List of Task objects to process batch_size: The size of the batch num_output_tasks: The number of output tasks to generate. Either batch_size or num_output_tasks must be provided but not both. Returns: List of task batches

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._process_shuffle_stage_with_rapidsmpf_actors(
actors: list[ray.actor.ActorHandle],
tasks: list[nemo_curator.tasks.Task],
band_range: tuple[int, int] | None = None
) -> list[nemo_curator.tasks.Task]

Process Shuffle through the actors. Args: actors: The actors to use for processing tasks: List of Task objects to process band_range: Band range for LSH shuffle Returns: List of processed Task objects

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor._process_stage_with_pool(
actor_pool: ray.util.actor_pool.ActorPool,
_stage: nemo_curator.stages.base.ProcessingStage,
tasks: list[nemo_curator.tasks.Task]
) -> list[nemo_curator.tasks.Task]

Process tasks through the actor pool.

Parameters:

actor_pool
ActorPool

The ActorPool to use for processing

_stage
ProcessingStage

The processing stage (for logging/context, unused)

tasks
list[Task]

List of Task objects to process

Returns: list[Task]

List of processed Task objects

nemo_curator.backends.experimental.ray_actor_pool.executor.RayActorPoolExecutor.execute(
stages: list[nemo_curator.stages.base.ProcessingStage],
initial_tasks: list[nemo_curator.tasks.Task] | None = None
) -> list[nemo_curator.tasks.Task]

Execute the pipeline stages using ActorPool.

Parameters:

stages
list[ProcessingStage]

List of processing stages to execute

initial_tasks
list[Task] | NoneDefaults to None

Initial tasks to process (can be None for empty start)

Returns: list[Task]

List of final processed tasks

nemo_curator.backends.experimental.ray_actor_pool.executor._parse_runtime_env(
runtime_env: dict
) -> dict
nemo_curator.backends.experimental.ray_actor_pool.executor._LARGE_INT = 2 ** 31 - 1