nemo_rl.distributed.worker_groups
#
Module Contents#
Classes#
Container for Ray futures with associated worker information. |
|
Manages a group of distributed Ray worker/actor processes that execute tasks in parallel. |
API#
- class nemo_rl.distributed.worker_groups.MultiWorkerFuture[source]#
Container for Ray futures with associated worker information.
- futures: List[ray.ObjectRef]#
None
- used_workers: List[int]#
None
- respect_tied_workers: bool#
True
- get_results(worker_group)[source]#
Get results from the futures, optionally respecting tied workers.
When respect_tied_workers is True, this method deduplicates results by returning only one result per tied worker group.
The method uses worker_group.worker_to_tied_group_index to identify which tied worker group each worker belongs to, then selects only the first result from each group.
- Parameters:
worker_group – The RayWorkerGroup that created this bundle
- Returns:
List of results, deduplicated by tied workers if respect_tied_workers is True
- class nemo_rl.distributed.worker_groups.RayWorkerBuilder(ray_actor_class_fqn: str, *args, **kwargs)[source]#
Initialization
- class IsolatedWorkerInitializer(
- ray_actor_class_fqn: str,
- *init_args,
- **init_kwargs,
Initialization
- create_worker(
- placement_group: ray.util.placement_group.PlacementGroup,
- placement_group_bundle_index: int,
- num_gpus: int,
- bundle_indices: Optional[tuple] = None,
- **extra_options: Optional[Dict[str, Any]],
Create a Ray worker with the specified configuration.
Order of precedence for worker options configuration (from lowest to highest):
Options passed by the user to call (extra_options)
Options required by the worker via configure_worker (may override user options with warning)
Options set by the RayWorkerBuilder.call (specifically scheduling strategy)
If the worker needs to override user-provided options, it should log a warning to inform the user about the change and the reason for it.
- Parameters:
placement_group – Ray placement group for resource allocation
placement_group_bundle_index – Index of the bundle in the placement group
num_gpus – Number of GPUs to allocate to this worker
bundle_indices – Tuple of (node_idx, local_bundle_indices) for tensor parallelism (if applicable)
extra_options – Additional options to pass to the Ray actor (may be overridden by actor’s configure_worker(…) method)
- Returns:
A Ray actor reference to the created worker
- __call__(
- placement_group: ray.util.placement_group.PlacementGroup,
- placement_group_bundle_index: int,
- num_gpus: int,
- bundle_indices: Optional[tuple] = None,
- **extra_options: Dict[str, Any],
Create a Ray worker with the specified configuration.
Order of precedence for worker options configuration (from lowest to highest):
Options passed by the user to call (extra_options)
Options required by the worker via configure_worker (may override user options with warning)
Options set by the RayWorkerBuilder.call (specifically scheduling strategy)
If the worker needs to override user-provided options, it should log a warning to inform the user about the change and the reason for it.
- Parameters:
placement_group – Ray placement group for resource allocation
placement_group_bundle_index – Index of the bundle in the placement group
num_gpus – Number of GPUs to allocate to this worker
bundle_indices – Tuple of (node_idx, local_bundle_indices) for tensor parallelism (if applicable)
extra_options – Additional options to pass to the Ray actor (may be overridden by actor’s configure_worker(…) method)
- Returns:
A Ray actor reference to the created worker
- class nemo_rl.distributed.worker_groups.RayWorkerGroup(
- cluster: nemo_rl.distributed.virtual_cluster.RayVirtualCluster,
- remote_worker_builder: nemo_rl.distributed.worker_groups.RayWorkerBuilder,
- workers_per_node: Optional[Union[int, List[int]]] = None,
- name_prefix: str = '',
- bundle_indices_list: Optional[List[tuple]] = None,
Manages a group of distributed Ray worker/actor processes that execute tasks in parallel.
This class creates and manages Ray actor instances that run on resources allocated by a RayVirtualCluster. It handles:
Worker creation and placement on specific GPU resources
Setting up distributed training environment variables (rank, world size, etc.)
Executing methods across all workers in parallel
Collecting and aggregating results
Support for tied worker groups where multiple workers process the same data
Initialization
Initialize a group of distributed Ray workers.
- Parameters:
cluster – RayVirtualCluster
remote_worker_builder – Callable that launches a ray worker and has updatable options
workers_per_node – Defaults to launch one worker per bundle in the cluster. Alternatively specify an int or list to launch a different number of workers per node.
name_prefix – Optional prefix for the names of the workers
bundle_indices_list – Explicit list of (node_idx, [local_bundle_indices]) tuples. Each tuple defines a tied group of workers placed on the same node. If provided, workers_per_node is ignored.
- _create_workers_from_bundle_indices(
- remote_worker_builder,
- bundle_indices_list,
Create workers based on explicit bundle indices for tied worker groups.
- Parameters:
remote_worker_builder – Builder function for Ray actors
bundle_indices_list – List of (node_idx, local_bundle_indices) tuples, where each tuple specifies a tied group with its node and local bundle indices.
- property workers#
- property worker_metadata#
- property group_count#
Number of tied worker groups.
- run_all_workers_multiple_data(
- method_name: str,
- data: List[nemo_rl.distributed.batched_data_dict.SlicedDataDict],
- common_kwargs: Optional[Dict[str, Any]] = None,
- only_on: Literal[all, tied_leader, all_tied_workers] = 'all',
Run a method on all workers in parallel with different data.
- Parameters:
method_name – Name of the method to call on each worker
data – List of data slices to pass to workers/groups
common_kwargs – Additional keyword arguments to pass to all workers
only_on –
Determines which workers receive data and execute the method:
”all”: Each worker gets its own data slice
”tied_leader”: Only the first worker in each tied group receives data
”all_tied_workers”: All workers in each tied group receive the same data slice
- Returns:
Object containing futures and their associated worker information
- Return type:
- run_all_workers_single_data(
- method_name: str,
- *args,
- only_on: Literal[all, tied_leader, all_tied_workers] = 'all',
- **kwargs,
Run a method on all workers in parallel with the same data.
- Parameters:
method_name – Name of the method to call on each worker
only_on –
Determines which workers to run the method on:
”all”: Run on all workers
”tied_leader”: Run only on the first worker of each tied worker group
”all_tied_workers”: Run on all workers in each tied worker group
*args – Arguments to pass to the method
**kwargs – Arguments to pass to the method
- Returns:
A list of ray futures
- Return type:
List[ray.ObjectRef]
- get_all_worker_results(future_bundle)[source]#
Get results from all workers, optionally filtering to get just one result per tied worker group.
- Parameters:
future_bundle – MultiWorkerFuture containing futures and worker information. When future_bundle.respect_tied_workers is True, only results from the leaders of tied worker groups are returned.
- Returns:
List of results, deduplicated as specified in the future_bundle
- shutdown(
- cleanup_method: Optional[str] = None,
- timeout: Optional[float] = 30.0,
- force: bool = False,
Shutdown all workers in the worker group.
- Parameters:
cleanup_method – Optional method name to call on each worker before termination. If provided, this method will be called on each worker to allow for graceful cleanup.
timeout – Timeout in seconds for graceful shutdown. Only applicable if cleanup_method is provided. If None, wait indefinitely for workers to complete their cleanup.
force – If True, forcefully terminate workers with ray.kill() even if cleanup_method is provided. If cleanup_method is None, workers are always forcefully terminated.
- Returns:
True if all workers were successfully shut down
- Return type:
bool