nemo_rl.distributed.worker_groups#

Module Contents#

Classes#

MultiWorkerFuture

Container for Ray futures with associated worker information.

RayWorkerBuilder

RayWorkerGroup

Manages a group of distributed Ray worker/actor processes that execute tasks in parallel.

API#

class nemo_rl.distributed.worker_groups.MultiWorkerFuture[source]#

Container for Ray futures with associated worker information.

futures: List[ray.ObjectRef]#

None

used_workers: List[int]#

None

respect_tied_workers: bool#

True

get_results(worker_group)[source]#

Get results from the futures, optionally respecting tied workers.

When respect_tied_workers is True, this method deduplicates results by returning only one result per tied worker group.

The method uses worker_group.worker_to_tied_group_index to identify which tied worker group each worker belongs to, then selects only the first result from each group.

Parameters:

worker_group – The RayWorkerGroup that created this bundle

Returns:

List of results, deduplicated by tied workers if respect_tied_workers is True

class nemo_rl.distributed.worker_groups.RayWorkerBuilder(ray_actor_class_fqn: str, *args, **kwargs)[source]#

Initialization

class IsolatedWorkerInitializer(
ray_actor_class_fqn: str,
*init_args,
**init_kwargs,
)[source]#

Initialization

create_worker(
placement_group: ray.util.placement_group.PlacementGroup,
placement_group_bundle_index: int,
num_gpus: int,
bundle_indices: Optional[tuple] = None,
**extra_options: Optional[Dict[str, Any]],
)[source]#

Create a Ray worker with the specified configuration.

Order of precedence for worker options configuration (from lowest to highest):

  1. Options passed by the user to call (extra_options)

  2. Options required by the worker via configure_worker (may override user options with warning)

  3. Options set by the RayWorkerBuilder.call (specifically scheduling strategy)

If the worker needs to override user-provided options, it should log a warning to inform the user about the change and the reason for it.

Parameters:
  • placement_group – Ray placement group for resource allocation

  • placement_group_bundle_index – Index of the bundle in the placement group

  • num_gpus – Number of GPUs to allocate to this worker

  • bundle_indices – Tuple of (node_idx, local_bundle_indices) for tensor parallelism (if applicable)

  • extra_options – Additional options to pass to the Ray actor (may be overridden by actor’s configure_worker(…) method)

Returns:

A Ray actor reference to the created worker

__call__(
placement_group: ray.util.placement_group.PlacementGroup,
placement_group_bundle_index: int,
num_gpus: int,
bundle_indices: Optional[tuple] = None,
**extra_options: Dict[str, Any],
)[source]#

Create a Ray worker with the specified configuration.

Order of precedence for worker options configuration (from lowest to highest):

  1. Options passed by the user to call (extra_options)

  2. Options required by the worker via configure_worker (may override user options with warning)

  3. Options set by the RayWorkerBuilder.call (specifically scheduling strategy)

If the worker needs to override user-provided options, it should log a warning to inform the user about the change and the reason for it.

Parameters:
  • placement_group – Ray placement group for resource allocation

  • placement_group_bundle_index – Index of the bundle in the placement group

  • num_gpus – Number of GPUs to allocate to this worker

  • bundle_indices – Tuple of (node_idx, local_bundle_indices) for tensor parallelism (if applicable)

  • extra_options – Additional options to pass to the Ray actor (may be overridden by actor’s configure_worker(…) method)

Returns:

A Ray actor reference to the created worker

class nemo_rl.distributed.worker_groups.RayWorkerGroup(
cluster: nemo_rl.distributed.virtual_cluster.RayVirtualCluster,
remote_worker_builder: nemo_rl.distributed.worker_groups.RayWorkerBuilder,
workers_per_node: Optional[Union[int, List[int]]] = None,
name_prefix: str = '',
bundle_indices_list: Optional[List[tuple]] = None,
)[source]#

Manages a group of distributed Ray worker/actor processes that execute tasks in parallel.

This class creates and manages Ray actor instances that run on resources allocated by a RayVirtualCluster. It handles:

  • Worker creation and placement on specific GPU resources

  • Setting up distributed training environment variables (rank, world size, etc.)

  • Executing methods across all workers in parallel

  • Collecting and aggregating results

  • Support for tied worker groups where multiple workers process the same data

Initialization

Initialize a group of distributed Ray workers.

Parameters:
  • cluster – RayVirtualCluster

  • remote_worker_builder – Callable that launches a ray worker and has updatable options

  • workers_per_node – Defaults to launch one worker per bundle in the cluster. Alternatively specify an int or list to launch a different number of workers per node.

  • name_prefix – Optional prefix for the names of the workers

  • bundle_indices_list – Explicit list of (node_idx, [local_bundle_indices]) tuples. Each tuple defines a tied group of workers placed on the same node. If provided, workers_per_node is ignored.

_create_workers_from_bundle_indices(
remote_worker_builder,
bundle_indices_list,
)[source]#

Create workers based on explicit bundle indices for tied worker groups.

Parameters:
  • remote_worker_builder – Builder function for Ray actors

  • bundle_indices_list – List of (node_idx, local_bundle_indices) tuples, where each tuple specifies a tied group with its node and local bundle indices.

property workers#
property worker_metadata#
property group_count#

Number of tied worker groups.

run_all_workers_multiple_data(
method_name: str,
data: List[nemo_rl.distributed.batched_data_dict.SlicedDataDict],
common_kwargs: Optional[Dict[str, Any]] = None,
only_on: Literal[all, tied_leader, all_tied_workers] = 'all',
)[source]#

Run a method on all workers in parallel with different data.

Parameters:
  • method_name – Name of the method to call on each worker

  • data – List of data slices to pass to workers/groups

  • common_kwargs – Additional keyword arguments to pass to all workers

  • only_on

    Determines which workers receive data and execute the method:

    • ”all”: Each worker gets its own data slice

    • ”tied_leader”: Only the first worker in each tied group receives data

    • ”all_tied_workers”: All workers in each tied group receive the same data slice

Returns:

Object containing futures and their associated worker information

Return type:

MultiWorkerFuture

run_all_workers_single_data(
method_name: str,
*args,
only_on: Literal[all, tied_leader, all_tied_workers] = 'all',
**kwargs,
)[source]#

Run a method on all workers in parallel with the same data.

Parameters:
  • method_name – Name of the method to call on each worker

  • only_on

    Determines which workers to run the method on:

    • ”all”: Run on all workers

    • ”tied_leader”: Run only on the first worker of each tied worker group

    • ”all_tied_workers”: Run on all workers in each tied worker group

  • *args – Arguments to pass to the method

  • **kwargs – Arguments to pass to the method

Returns:

A list of ray futures

Return type:

List[ray.ObjectRef]

get_all_worker_results(future_bundle)[source]#

Get results from all workers, optionally filtering to get just one result per tied worker group.

Parameters:

future_bundle – MultiWorkerFuture containing futures and worker information. When future_bundle.respect_tied_workers is True, only results from the leaders of tied worker groups are returned.

Returns:

List of results, deduplicated as specified in the future_bundle

shutdown(
cleanup_method: Optional[str] = None,
timeout: Optional[float] = 30.0,
force: bool = False,
)[source]#

Shutdown all workers in the worker group.

Parameters:
  • cleanup_method – Optional method name to call on each worker before termination. If provided, this method will be called on each worker to allow for graceful cleanup.

  • timeout – Timeout in seconds for graceful shutdown. Only applicable if cleanup_method is provided. If None, wait indefinitely for workers to complete their cleanup.

  • force – If True, forcefully terminate workers with ray.kill() even if cleanup_method is provided. If cleanup_method is None, workers are always forcefully terminated.

Returns:

True if all workers were successfully shut down

Return type:

bool

print_worker_layout()[source]#

Prints a visual representation of the worker layout across the virtual cluster.

This shows which workers are assigned to which nodes and GPUs.