backends.experimental.ray_actor_pool.shuffle_adapter
#
Module Contents#
Classes#
Ray actor that wraps a shuffle stage and its actor. |
API#
- class backends.experimental.ray_actor_pool.shuffle_adapter.ShuffleStageAdapter(
- stage: ShuffleStage | LSHStage,
- rank: int,
- nranks: int,
- num_input_tasks: int | None = None,
Bases:
nemo_curator.backends.base.BaseStageAdapter
Ray actor that wraps a shuffle stage and its actor.
This adapter manages the lifecycle of a shuffle actor (like LSHActor) and provides a uniform interface for the executor.
Initialization
Initialize the adapter.
Args: stage: The shuffle stage to wrap rank: This actor’s rank in the group nranks: Total number of actors in the group session_id: Unique session identifier input_nparts: Total input partitions
- extract_and_write() list[nemo_curator.tasks.FileGroupTask] #
Extract shuffled data and write to output files.
- get_batch_size() int #
Get the batch size for this stage.
- insert_finished() None #
Finish the insertion phase and trigger shuffle.
- read_and_insert(
- tasks: list[nemo_curator.tasks.FileGroupTask],
- band_range: tuple[int, int] | None = None,
Read and insert tasks into the shuffler.
- setup(
- root_address: bytes,
- worker_metadata: WorkerMetadata | None = None,
Setup shuffle workers and stage
- setup_on_node() None #
Note: This method is not used in the current implementation since we use the Ray Data pattern of calling setup_on_node before actor creation.
- setup_root() None #
Setup the root actor.
- setup_worker(root_address: bytes) None #
Setup UCXX communication.
- teardown() None #
Clean up resources.