backends.experimental.ray_actor_pool.shuffle_adapter#

Module Contents#

Classes#

ShuffleStageAdapter

Ray actor that wraps a shuffle stage and its actor.

API#

class backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter(
stage: ShuffleStage | LSHStage,
rank: int,
nranks: int,
num_input_tasks: int | None = None,
)#

Bases: nemo_curator.backends.base.BaseStageAdapter

Ray actor that wraps a shuffle stage and its actor.

This adapter manages the lifecycle of a shuffle actor (like LSHActor) and provides a uniform interface for the executor.

Initialization

Initialize the adapter.

Args: stage: The shuffle stage to wrap rank: This actor’s rank in the group nranks: Total number of actors in the group session_id: Unique session identifier input_nparts: Total input partitions

extract_and_write() list[nemo_curator.tasks.FileGroupTask]#

Extract shuffled data and write to output files.

get_batch_size() int#

Get the batch size for this stage.

insert_finished() None#

Finish the insertion phase and trigger shuffle.

read_and_insert(
tasks: list[nemo_curator.tasks.FileGroupTask],
band_range: tuple[int, int] | None = None,
) list[nemo_curator.tasks.FileGroupTask]#

Read and insert tasks into the shuffler.

setup(
root_address: bytes,
worker_metadata: WorkerMetadata | None = None,
) None#

Setup shuffle workers and stage

setup_on_node() None#

Note: This method is not used in the current implementation since we use the Ray Data pattern of calling setup_on_node before actor creation.

setup_root() None#

Setup the root actor.

setup_worker(root_address: bytes) None#

Setup UCXX communication.

teardown() None#

Clean up resources.