core.pipeline_parallel.p2p_communication#

Module Contents#

Classes#

P2PCommunicator

P2P (Point-to-Point) Communicator for pipeline parallelism.

Functions#

_batched_p2p_ops

_p2p_ops

is_single_shape

Check if the input is a single shape.

Data#

API#

core.pipeline_parallel.p2p_communication.Shape#

None

core.pipeline_parallel.p2p_communication._batched_p2p_ops(
*,
tensor_send_prev: Optional[torch.Tensor],
tensor_recv_prev: Optional[torch.Tensor],
tensor_send_next: Optional[torch.Tensor],
tensor_recv_next: Optional[torch.Tensor],
group: torch.distributed.ProcessGroup,
prev_pipeline_rank: int,
next_pipeline_rank: int,
)#
core.pipeline_parallel.p2p_communication._p2p_ops(
*,
tensor_send_prev: Optional[torch.Tensor],
tensor_recv_prev: Optional[torch.Tensor],
tensor_send_next: Optional[torch.Tensor],
tensor_recv_next: Optional[torch.Tensor],
group: torch.distributed.ProcessGroup,
prev_pipeline_rank: int,
next_pipeline_rank: int,
)#
core.pipeline_parallel.p2p_communication.is_single_shape(x) bool#

Check if the input is a single shape.

class core.pipeline_parallel.p2p_communication.P2PCommunicator(
pp_group: torch.distributed.ProcessGroup,
config: megatron.core.model_parallel_config.ModelParallelConfig,
)#

P2P (Point-to-Point) Communicator for pipeline parallelism.

This class handles communication between pipeline stages by managing tensor exchanges between consecutive stages in the pipeline.

Initialization

_communicate_shapes(
tensor_send_next,
tensor_send_prev,
recv_prev,
recv_next,
)#

Communicate tensor shapes between stages. Used to communicate tensor shapes before the actual tensor communication happens. This is required when the sequence lengths across micro batches are not uniform.

Parameters:
  • tensor_send_next – tensor to send to next rank (no tensor sent if set to None).

  • tensor_send_prev – tensor to send to prev rank (no tensor sent if set to None).

  • recv_prev – boolean for whether tensor should be received from previous rank.

  • recv_next – boolean for whether tensor should be received from next rank.

Returns:

(recv_prev_shape, recv_next_shape)

_communicate(
*,
tensor_send_next: Optional[torch.Tensor],
tensor_send_prev: Optional[torch.Tensor],
recv_prev: bool,
recv_next: bool,
tensor_shape: core.pipeline_parallel.p2p_communication.Shape,
wait_on_reqs: bool = True,
) Tuple[torch.Tensor, torch.Tensor]#

Communicate tensors between stages. Used as helper method in other communication methods that are used in megatron/schedules.py.

Parameters:
  • tensor_send_next (torch.Tensor, optional) – Tensor to send to next rank (no tensor sent if None)

  • tensor_send_prev (torch.Tensor, optional) – Tensor to send to prev rank (no tensor sent if None)

  • recv_prev (boolean, required) – whether tensor should be received from previous rank.

  • recv_next (boolean, required) – whether tensor should be received from next rank.

  • tensor_shape (List[int] or torch.Size, required) – shape of tensor to receive (this method assumes that all tensors sent and received in a single function call are the same shape).

  • wait_on_reqs (boolean, optional, default=False) – For non-batched p2p communication, wait on each request before returning.

Returns:

tuple containing

  • tensor_recv_prev: torch.Tensor if recv_prev is True, None otherwise.

  • tensor_recv_next: torch.Tensor if recv_next is True, None otherwise.

recv_forward(
tensor_shapes,
is_first_stage: bool,
) Union[torch.Tensor, list[torch.Tensor]]#

Receive tensor from previous rank in pipeline (forward receive).

recv_backward(
tensor_shapes,
is_last_stage: bool,
) Union[torch.Tensor, list[torch.Tensor]]#

Receive tensor from next rank in pipeline (backward receive).

send_forward(output_tensors, is_last_stage: bool) None#

Send tensor to next rank in pipeline (forward send).

send_backward(input_tensor_grads, is_first_stage: bool) None#

Send tensor to previous rank in pipeline (backward send).

send_forward_recv_backward(
output_tensors,
tensor_shapes,
is_last_stage: bool,
) Union[torch.Tensor, list[torch.Tensor]]#

Batched send and recv with next rank in pipeline.

send_backward_recv_forward(
input_tensor_grads,
tensor_shapes,
is_first_stage: bool,
) Union[torch.Tensor, list[torch.Tensor]]#

Batched send and recv with previous rank in pipeline.

send_forward_recv_forward(
output_tensor: torch.Tensor,
recv_prev: bool,
tensor_shape: core.pipeline_parallel.p2p_communication.Shape,
overlap_p2p_comm: bool = False,
) torch.Tensor#

Batched recv from previous rank and send to next rank in pipeline.

send_backward_recv_backward(
input_tensor_grad: torch.Tensor,
recv_next: bool,
tensor_shape: core.pipeline_parallel.p2p_communication.Shape,
overlap_p2p_comm: bool = False,
) torch.Tensor#

Batched recv from next rank and send to previous rank in pipeline.

send_forward_backward_recv_forward_backward(
output_tensor: torch.Tensor,
input_tensor_grad: torch.Tensor,
recv_prev: bool,
recv_next: bool,
tensor_shape: core.pipeline_parallel.p2p_communication.Shape,
) torch.Tensor#

Batched send and recv with previous and next ranks in pipeline.