core.resharding.nvshmem_copy_service.core.pipeline_executor#
Pipelined communication execution engine.
Orchestrates the pack/send/unpack pipeline with double-buffering and proper stream synchronization.
Module Contents#
Classes#
Executes pipelined NVSHMEM communication with pack/send/unpack overlap. |
API#
- class core.resharding.nvshmem_copy_service.core.pipeline_executor.PipelineExecutor(
- kernel_launcher: core.resharding.nvshmem_copy_service.core.kernel_launcher.KernelLauncher,
- buffer_manager: core.resharding.nvshmem_copy_service.memory.double_buffer_manager.DoubleBufferManager,
- my_pe: int,
Executes pipelined NVSHMEM communication with pack/send/unpack overlap.
Initialization
Initialize pipeline executor.
- Parameters:
kernel_launcher – KernelLauncher instance for pack/unpack kernels
buffer_manager – DoubleBufferManager for send/recv buffers
my_pe – This PE’s rank
- set_streams(
- pack_stream,
- unpack_stream,
- send_stream,
- copy_stream,
- torch_pack_stream,
- torch_unpack_stream,
- torch_copy_stream,
Set CUDA streams for execution.
- set_events(pack_events: List, unpack_events: List)#
Set double-buffered CUDA events.
- execute_pipeline(
- iter_schedules: List[Dict[str, Optional[core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch]]],
- num_iterations: int,
Execute pipelined communication.
Pipeline stages:
Pack NEXT iteration (async)
Unpack PRIOR iteration (async)
Send CURRENT iteration (sync)
Barrier
Wait for async pack/unpack to complete
- Parameters:
iter_schedules – List of iteration schedules
num_iterations – Total number of iterations
- _launch_pack(
- iteration: int,
- batch: core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch,
Launch pack kernel for given iteration.
- _launch_unpack(
- iteration: int,
- batch: core.resharding.nvshmem_copy_service.nvshmem_types.ScheduledBatch,
Launch unpack kernel for given iteration.
- process_self_moves(
- send_requests: List[core.resharding.nvshmem_copy_service.nvshmem_types.SendRequest],
- receive_requests: List[core.resharding.nvshmem_copy_service.nvshmem_types.ReceiveRequest],
Handle same-PE transfers (where src_pe == dest_pe == my_pe).
Uses PyTorch copy on the copy stream for efficiency.
- Parameters:
send_requests – List of send requests
receive_requests – List of receive requests