nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter

View as Markdown

Module Contents

Classes

NameDescription
ShuffleStageAdapterRay actor that wraps a shuffle stage and its actor.

API

class nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter(
stage: nemo_curator.stages.shuffler.stage.ShuffleStage | nemo_curator.stages.deduplication.fuzzy.lsh.stage.LSHStage,
rank: int,
nranks: int,
num_input_tasks: int | None = None
)

Bases: BaseStageAdapter

Ray actor that wraps a shuffle stage and its actor.

This adapter manages the lifecycle of a shuffle actor (like LSHActor) and provides a uniform interface for the executor.

_batch_size
= self.stage.batch_size
output_nparts
nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.extract_and_write() -> list[nemo_curator.tasks.FileGroupTask]

Extract shuffled data and write to output files.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.get_batch_size() -> int

Get the batch size for this stage.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.insert_finished() -> None

Finish the insertion phase and trigger shuffle.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.read_and_insert(
tasks: list[nemo_curator.tasks.FileGroupTask],
band_range: tuple[int, int] | None = None
) -> list[nemo_curator.tasks.FileGroupTask]

Read and insert tasks into the shuffler.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.setup(
root_address: bytes,
worker_metadata: nemo_curator.backends.base.WorkerMetadata | None = None
) -> None

Setup shuffle workers and stage

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.setup_on_node() -> None

Note: This method is not used in the current implementation since we use the Ray Data pattern of calling setup_on_node before actor creation.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.setup_root() -> None

Setup the root actor.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.setup_worker(
root_address: bytes
) -> None

Setup UCXX communication.

nemo_curator.backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter.teardown() -> None

Clean up resources.