core.inference.communication_utils#

Module Contents#

Functions#

is_pipeline_first_stage

Check if the current process is the first stage of the pipeline

is_pipeline_last_stage

Check if the current process is the last stage of the pipeline

_is_cuda

Check if a tensor is not none and is cuda.

_is_cuda_contiguous

Check if a tensor is not none, is cuda, and is contiguous.

broadcast_from_last_pipeline_stage

Broadcast a tensor from last pipeline stage to all ranks.

recv_from_prev_pipeline_rank_

Receive from previous pipeline stage and update the input buffer inplace.

send_to_next_pipeline_rank

Send output to the next pipeline stage.

broadcast_tensor

Given size and type of a tensor on all ranks and the tensor value only on a specific rank, broadcast from that rank to all other ranks.

broadcast_list

Broadcast a list of values with a given type.

broadcast_int_list

Broadcast a list of integer values.

broadcast_float_list

Broadcast a list of float values.

API#

core.inference.communication_utils.is_pipeline_first_stage(pp_group: torch.distributed.ProcessGroup)#

Check if the current process is the first stage of the pipeline

core.inference.communication_utils.is_pipeline_last_stage(pp_group: torch.distributed.ProcessGroup)#

Check if the current process is the last stage of the pipeline

core.inference.communication_utils._is_cuda(tensor)#

Check if a tensor is not none and is cuda.

core.inference.communication_utils._is_cuda_contiguous(tensor)#

Check if a tensor is not none, is cuda, and is contiguous.

core.inference.communication_utils.broadcast_from_last_pipeline_stage(
size: List[int],
dtype: torch.dtype,
tensor: Optional[torch.Tensor] = None,
pp_group: Optional[torch.distributed.ProcessGroup] = None,
)#

Broadcast a tensor from last pipeline stage to all ranks.

Parameters:
  • size – Expected tensor size

  • dtype – Expected tensor dtype

  • tensor – Tensor to broadcast (only on last stage)

  • pp_group – Custom process group (if None, uses global state)

core.inference.communication_utils.recv_from_prev_pipeline_rank_(
recv_buffer: torch.Tensor = None,
pp_group: Optional[torch.distributed.ProcessGroup] = None,
)#

Receive from previous pipeline stage and update the input buffer inplace.

Parameters:
  • recv_buffer – Buffer to receive data into

  • pp_group – Custom process group (if None, uses global state)

core.inference.communication_utils.send_to_next_pipeline_rank(
tensor: torch.Tensor = None,
pp_group: Optional[torch.distributed.ProcessGroup] = None,
)#

Send output to the next pipeline stage.

Parameters:
  • tensor – Tensor to send

  • pp_group – Custom process group (if None, uses global state)

core.inference.communication_utils.broadcast_tensor(
size,
dtype,
tensor=None,
rank=0,
data_parallel=False,
)#

Given size and type of a tensor on all ranks and the tensor value only on a specific rank, broadcast from that rank to all other ranks.

Parameters:

data_parallel (bool) – Broadcast across a single data parallel model replica.

core.inference.communication_utils.broadcast_list(
size,
dtype,
list_values=None,
rank=0,
data_parallel=False,
)#

Broadcast a list of values with a given type.

Parameters:

data_parallel (bool) – Broadcast across a single data parallel model replica.

core.inference.communication_utils.broadcast_int_list(size, int_list=None, rank=0, data_parallel=False)#

Broadcast a list of integer values.

Parameters:

data_parallel (bool) – Broadcast across a single data parallel model replica.

core.inference.communication_utils.broadcast_float_list(
size,
float_list=None,
rank=0,
data_parallel=False,
)#

Broadcast a list of float values.

Parameters:

data_parallel (bool) – Broadcast across a single data parallel model replica.