core.resharding.nvshmem_copy_service.core.pipeline_executor#

Pipelined communication execution engine.

Orchestrates the pack/send/unpack pipeline with double-buffering and proper stream synchronization.

Module Contents#

Classes#

PipelineExecutor

Executes pipelined NVSHMEM communication with pack/send/unpack overlap.

API#

class core.resharding.nvshmem_copy_service.core.pipeline_executor.PipelineExecutor(
kernel_launcher: core.resharding.nvshmem_copy_service.core.kernel_launcher.KernelLauncher,
buffer_manager: core.resharding.nvshmem_copy_service.memory.double_buffer_manager.DoubleBufferManager,
my_pe: int,
)#

Executes pipelined NVSHMEM communication with pack/send/unpack overlap.

Initialization

Initialize pipeline executor.

Parameters:
  • kernel_launcher – KernelLauncher instance for pack/unpack kernels

  • buffer_manager – DoubleBufferManager for send/recv buffers

  • my_pe – This PE’s rank

set_streams(
pack_stream,
unpack_stream,
send_stream,
copy_stream,
torch_pack_stream,
torch_unpack_stream,
torch_copy_stream,
)#

Set CUDA streams for execution.

set_events(pack_events: List, unpack_events: List)#

Set double-buffered CUDA events.

execute_pipeline(
iter_schedules: List[Dict[str, Optional[core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch]]],
num_iterations: int,
) None#

Execute pipelined communication.

Pipeline stages:

  1. Pack NEXT iteration (async)

  2. Unpack PRIOR iteration (async)

  3. Send CURRENT iteration (sync)

  4. Barrier

  5. Wait for async pack/unpack to complete

Parameters:
  • iter_schedules – List of iteration schedules

  • num_iterations – Total number of iterations

_launch_pack(
iteration: int,
batch: core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch,
) None#

Launch pack kernel for given iteration.

_launch_unpack(
iteration: int,
batch: core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch,
) None#

Launch unpack kernel for given iteration.

process_self_moves(
send_requests: List[core.resharding.nvshmem_copy_service.nvshmem_types.SendRequest],
receive_requests: List[core.resharding.nvshmem_copy_service.nvshmem_types.ReceiveRequest],
) None#

Handle same-PE transfers (where src_pe == dest_pe == my_pe).

Uses PyTorch copy on the copy stream for efficiency.

Parameters:
  • send_requests – List of send requests

  • receive_requests – List of receive requests