core.inference.communication_utils#
Module Contents#
Functions#
Check if the current process is the first stage of the pipeline |
|
Check if the current process is the last stage of the pipeline |
|
Check if a tensor is not none and is cuda. |
|
Check if a tensor is not none, is cuda, and is contiguous. |
|
Broadcast a tensor from last pipeline stage to all ranks. |
|
Receive from previous pipeline stage and update the input buffer inplace. |
|
Send output to the next pipeline stage. |
|
Given size and type of a tensor on all ranks and the tensor value only on a specific rank, broadcast from that rank to all other ranks. |
|
Broadcast a list of values with a given type. |
|
Broadcast a list of integer values. |
|
Broadcast a list of float values. |
API#
- core.inference.communication_utils.is_pipeline_first_stage(pp_group: torch.distributed.ProcessGroup)#
Check if the current process is the first stage of the pipeline
- core.inference.communication_utils.is_pipeline_last_stage(pp_group: torch.distributed.ProcessGroup)#
Check if the current process is the last stage of the pipeline
- core.inference.communication_utils._is_cuda(tensor)#
Check if a tensor is not none and is cuda.
- core.inference.communication_utils._is_cuda_contiguous(tensor)#
Check if a tensor is not none, is cuda, and is contiguous.
- core.inference.communication_utils.broadcast_from_last_pipeline_stage(
- size: List[int],
- dtype: torch.dtype,
- tensor: Optional[torch.Tensor] = None,
- pp_group: Optional[torch.distributed.ProcessGroup] = None,
Broadcast a tensor from last pipeline stage to all ranks.
- Parameters:
size – Expected tensor size
dtype – Expected tensor dtype
tensor – Tensor to broadcast (only on last stage)
pp_group – Custom process group (if None, uses global state)
- core.inference.communication_utils.recv_from_prev_pipeline_rank_(
- recv_buffer: torch.Tensor = None,
- pp_group: Optional[torch.distributed.ProcessGroup] = None,
Receive from previous pipeline stage and update the input buffer inplace.
- Parameters:
recv_buffer – Buffer to receive data into
pp_group – Custom process group (if None, uses global state)
- core.inference.communication_utils.send_to_next_pipeline_rank(
- tensor: torch.Tensor = None,
- pp_group: Optional[torch.distributed.ProcessGroup] = None,
Send output to the next pipeline stage.
- Parameters:
tensor – Tensor to send
pp_group – Custom process group (if None, uses global state)
- core.inference.communication_utils.broadcast_tensor(
- size,
- dtype,
- tensor=None,
- rank=0,
- data_parallel=False,
Given size and type of a tensor on all ranks and the tensor value only on a specific rank, broadcast from that rank to all other ranks.
- Parameters:
data_parallel (bool) – Broadcast across a single data parallel model replica.
- core.inference.communication_utils.broadcast_list(
- size,
- dtype,
- list_values=None,
- rank=0,
- data_parallel=False,
Broadcast a list of values with a given type.
- Parameters:
data_parallel (bool) – Broadcast across a single data parallel model replica.
- core.inference.communication_utils.broadcast_int_list(size, int_list=None, rank=0, data_parallel=False)#
Broadcast a list of integer values.
- Parameters:
data_parallel (bool) – Broadcast across a single data parallel model replica.
- core.inference.communication_utils.broadcast_float_list(
- size,
- float_list=None,
- rank=0,
- data_parallel=False,
Broadcast a list of float values.
- Parameters:
data_parallel (bool) – Broadcast across a single data parallel model replica.