core.resharding.nvshmem_copy_service.core.pipeline_executor#

Pipelined communication execution engine.

Orchestrates the pack/send/unpack pipeline with double-buffering and proper stream synchronization.

Module Contents#

Classes#

PipelineExecutor

Executes pipelined NVSHMEM communication with pack/send/unpack overlap.

API#

class core.resharding.nvshmem_copy_service.core.pipeline_executor.PipelineExecutor(
kernel_launcher: core.resharding.nvshmem_copy_service.core.kernel_launcher.KernelLauncher,
buffer_manager: core.resharding.nvshmem_copy_service.memory.double_buffer_manager.DoubleBufferManager,
my_pe: int,
)#

Executes pipelined NVSHMEM communication with pack/send/unpack overlap.

Initialization

Initialize pipeline executor.

Parameters:
  • kernel_launcher – KernelLauncher instance for pack/unpack kernels

  • buffer_manager – DoubleBufferManager for send/recv buffers

  • my_pe – This PE’s rank

set_streams(
pack_stream,
unpack_stream,
send_stream,
copy_stream,
torch_pack_stream,
torch_unpack_stream,
torch_send_stream,
torch_copy_stream,
)#

Set CUDA streams for execution.

set_events(
pack_events: List,
unpack_events: List,
barrier_events: List,
)#

Set double-buffered CUDA events.

execute_pipeline(
iter_schedules: List[Dict[str, Optional[core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch]]],
num_iterations: int,
) None#

Execute pipelined communication.

Pipeline stages:

  1. Pack NEXT iteration (async)

  2. Unpack PRIOR iteration (async)

  3. Send CURRENT iteration

  4. Barrier + record barrier event

  5. Wait for async pack/unpack to complete

Cross-stream synchronization uses lightweight CUDA events instead of cudaDeviceSynchronize (torch.cuda.synchronize). The pack kernel includes __threadfence_system() to ensure writes are visible to the NIC’s DMA engine, and barrier_events propagate NVSHMEM RDMA completion from send_stream to unpack_stream.

Parameters:
  • iter_schedules – List of iteration schedules

  • num_iterations – Total number of iterations

_launch_pack(
iteration: int,
batch: core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch,
) None#

Launch pack kernel for given iteration.

_launch_unpack(
iteration: int,
batch: core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch,
) None#

Launch unpack kernel for given iteration.

process_self_moves(
send_requests: List[core.resharding.nvshmem_copy_service.nvshmem_types.SendRequest],
receive_requests: List[core.resharding.nvshmem_copy_service.nvshmem_types.ReceiveRequest],
) None#

Handle same-PE transfers (where src_pe == dest_pe == my_pe).

Uses PyTorch copy on the copy stream for efficiency.

Parameters:
  • send_requests – List of send requests

  • receive_requests – List of receive requests