core.pipeline_parallel.fine_grained_activation_offload#
Module Contents#
Classes#
GPU memory pool for efficient allocation and deallocation of tensors. |
|
A group of tensors to be offloaded together. |
|
Singleton manager for coordinating activation offloading across pipeline stages. Manages chunk handlers, synchronizes GPU-CPU transfers, and handles virtual pipeline parallelism. |
|
Handles activation offloading and reloading for a single pipeline chunk (microbatch). Manages tensor groups, coordinates asynchronous GPU-CPU transfers, and handles synchronization. |
|
Identity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward. |
|
Identity operation that marks the start of a layer group for offload/reload. Prepares for offload during forward and triggers reload during backward. |
|
Identity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward. |
|
Interface for fine-grained activation offloading. |
Functions#
Print debug message for a specific rank when DEBUG is enabled. |
|
Print an ASCII table summarizing offload bytes across all ranks. |
|
Disable the offload. |
|
Enable the offload. |
|
Specify the tensors to be released after offloading. forced_released_tensors is a list of tensors to be released after offloading. The tensors will be untyped_storage().resize_(0) after offloading. Note: specify the tensors only when they are not automatically released by torch gc. |
|
Flush the delayed groups. |
|
Mark the start of a layer group and prepare for offload/reload. |
|
Record the forward event for cuda graph capture. |
|
Record the backward event for cuda graph capture. |
Data#
API#
- core.pipeline_parallel.fine_grained_activation_offload.DEBUG#
False
- core.pipeline_parallel.fine_grained_activation_offload.DEBUG_RANK#
0
- core.pipeline_parallel.fine_grained_activation_offload.debug_rank(message)#
Print debug message for a specific rank when DEBUG is enabled.
- core.pipeline_parallel.fine_grained_activation_offload.print_offload_summary_table(
- total_offload_bytes: Dict[str, int],
Print an ASCII table summarizing offload bytes across all ranks.
Gathers offload data from all ranks and prints a formatted table on rank 0, with rows representing ranks and columns representing groups.
- Parameters:
total_offload_bytes – Dict mapping group names to offload bytes for this rank.
- class core.pipeline_parallel.fine_grained_activation_offload.GPUTensorPool(device: str = 'cuda', pin_memory: bool = False)#
GPU memory pool for efficient allocation and deallocation of tensors.
Features:
Supports multiple tensor shapes and dtypes, each with its own pool
Dynamic allocation: tensors are created on-demand during allocation
Efficient reuse: freed tensors are returned to the pool for reuse
Uses queue-based management for O(1) allocation and deallocation
.. rubric:: Example
pool = GPUTensorPool(device=’cuda:0’) tensor = pool.allocate((128, 512), dtype=torch.float32)
… use tensor …#
pool.free(tensor, (128, 512), dtype=torch.float32)
Initialization
Initialize GPU tensor pool.
- Parameters:
device – GPU device, default ‘cuda’
pin_memory – Whether to use pinned memory (mainly for CPU tensors)
- _get_pool_key(shape: Tuple, dtype: torch.dtype) Tuple#
Generate a unique key for the pool based on shape and dtype.
- static _calculate_memory_size(shape: Tuple, dtype: torch.dtype) int#
Calculate memory size in bytes.
- allocate(
- shape: Tuple,
- dtype: torch.dtype = torch.float32,
Allocate a tensor with the specified shape and dtype.
- Parameters:
shape – Shape of the tensor
dtype – Data type of the tensor, default torch.float32
- Returns:
Allocated tensor
- free(tensor: torch.Tensor)#
Return a tensor to the pool for reuse.
- Parameters:
tensor – Tensor to free
- Raises:
ValueError – If tensor doesn’t belong to this pool
- get_pool_status(
- shape: Tuple = None,
- dtype: torch.dtype = None,
Get the status of the memory pool.
- Parameters:
shape – If specified along with dtype, return status for that specific pool
dtype – Data type (required if shape is specified)
- Returns:
Dictionary containing status information
- reset()#
Reset the pool, marking all tensors as available.
- clear()#
Clear the pool and release all GPU memory.
- __del__()#
Destructor to ensure resources are released.
- class core.pipeline_parallel.fine_grained_activation_offload.OffloadTensorGroup(name)#
A group of tensors to be offloaded together.
Initialization
- push_tensor(tag, tensor)#
Push a tensor to the group.
- pop_tensor(tag)#
Pop a tensor from the group.
- record_offload_event(stream)#
Record the offload event.
- wait_offload_event(stream)#
Wait for the offload event.
- record_reload_event(stream)#
Record the reload event.
- wait_reload_event(stream)#
Wait for the reload event.
- update_offload_info(tensor)#
Update the offload information.
- class core.pipeline_parallel.fine_grained_activation_offload.PipelineOffloadManager#
Singleton manager for coordinating activation offloading across pipeline stages. Manages chunk handlers, synchronizes GPU-CPU transfers, and handles virtual pipeline parallelism.
Initialization
Initialize the manager with queues and dedicated CUDA streams.
- OFFLOAD_MGR#
None
- classmethod get_instance()#
Get the singleton instance of PipelineOffloadManager.
- classmethod reset_instance()#
Reset the singleton instance of PipelineOffloadManager.
- property d2h_stream#
Get the device-to-host (GPU to CPU) transfer stream.
- property h2d_stream#
Get the host-to-device (CPU to GPU) transfer stream.
- property cpu_tensor_pool#
Get the shared CPU tensor pool.
- push_offload_groups(group_hook, forced_released_tensors)#
Push the offload groups to the delayed queue.
- flush_delayed_groups()#
Flush the delayed groups.
- reset()#
Reset manager state for a new training iteration.
- property offload_summary_bytes: Dict[str, int]#
Offload summary bytes per group collected after warmup.
- property offload_summary_total_bytes: int#
Total offloaded bytes collected after warmup.
- flush()#
Flush all staged chunks to the backward queue in reverse order.
- disable_offload()#
Disable the offload.
- enable_offload()#
Enable the offload.
- post_warmup_callback()#
Callback after warmup.
- push(handler)#
Add a chunk handler to the backward queue.
- pop_backward_chunk(name=None)#
Get the next non-empty backward chunk containing the group with the given name.
- front_backward_chunk(name=None)#
Get the first non-empty backward chunk containing the group with the given name.
- init_model_chunk_offload_handler(
- vp_size,
- vp_stage,
- min_offloaded_tensor_size=1024 * 1024,
Initialize a chunk offload handler for a model chunk (microbatch).
- Parameters:
vp_size – Virtual pipeline size
vp_stage – Virtual pipeline stage index (None means stage 0)
min_offloaded_tensor_size – Minimum tensor size (in elements) to offload
- pop_forward_chunk(name=None)#
Get the next forward pass chunk handler.
- cur_forward_chunk()#
Get the current forward pass chunk handler.
- cur_backward_chunk()#
Get the current backward pass chunk handler.
- mark_not_offloadable(tensor: torch.Tensor)#
Mark the current forward chunk as not offloadable.
- __enter__()#
Enter context manager to enable activation offloading hooks.
- __exit__(*args: Any)#
Exit context manager and restore original tensor saving behavior.
- on_save_for_backward(tensor: torch.Tensor) Any#
Hook called when autograd saves a tensor for backward pass. Returns a tag to identify the tensor later.
- on_get_saved_tensor(saved_state: Any) torch.Tensor#
Hook called when autograd retrieves a saved tensor during backward pass. Returns the actual tensor (potentially reloading from CPU).
- class core.pipeline_parallel.fine_grained_activation_offload.ChunkOffloadHandler(min_offloaded_tensor_size, cpu_tensor_pool)#
Handles activation offloading and reloading for a single pipeline chunk (microbatch). Manages tensor groups, coordinates asynchronous GPU-CPU transfers, and handles synchronization.
Initialization
- offload(src_tensor, pin_memory=True, use_cpu_pool=True)#
Offload.
- reload(state, non_blocking=None)#
Reload.
- reset()#
Reset the chunk offload handler.
- find_group_with_name(name: str, start_index: int = 0)#
Find the group with the given name starting from the given index.
- is_empty_chunk(name=None)#
Check if this chunk has no tensors to manage.
- finish_all_groups(name=None) bool#
Finish all groups.
- find_next_group(name=None)#
Find the next group with the given name.
- tensor_push(tensor)#
Push tensor to the offload handler.
- tensor_pop(tensor_tag)#
Pop tensor from the offload handler.
- tensor_need_offloading_checker(tensor)#
Check if the tensor needs to be offloaded.
- bulk_offload_group()#
offload a group of tensors recorded in tensor_push().
- get_max_deduplicated_groups()#
Get the maximum number of deduplicated groups.
- bulk_reload_group()#
Bulk reload group.
- pre_reload_last_layer()#
Pre-reload the last layer of this chunk to hide reload latency.
- should_bulk_offload()#
Determine if the current group should be offloaded.
- bulk_offload(forced_released_tensors)#
Offload a group of tensors and optionally release their GPU memory.
- on_group_commit_forward(forced_released_tensors)#
Called at the end of a layer group’s forward pass to trigger offloading.
- bulk_reload()#
Reload the next group of tensors from CPU to GPU.
- on_group_commit_backward(name)#
Called at the end of a layer group’s backward pass. Ensures correct chunk is active and synchronizes reloads.
- on_group_start_forward(name)#
Called at the start of a layer group’s forward pass. Increments group index and prepares for offloading.
- on_group_start_backward()#
Called at the start of a layer group’s backward pass. Triggers reloading of tensors from CPU.
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_disable_offload()#
Disable the offload.
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_enable_offload()#
Enable the offload.
- class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedOffloadingGroupCommitFunction#
Bases:
torch.autograd.FunctionIdentity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward.
- static forward(
- ctx,
- tensor,
- cur_forward_chunk,
- name,
- forced_released_tensors,
- delay_offload,
- static backward(ctx, *grad_output)#
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_group_commit(
- tensor,
- name,
- forced_released_tensors=None,
- delay_offload=False,
Specify the tensors to be released after offloading. forced_released_tensors is a list of tensors to be released after offloading. The tensors will be untyped_storage().resize_(0) after offloading. Note: specify the tensors only when they are not automatically released by torch gc.
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_group_flush_delayed_groups()#
Flush the delayed groups.
- class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedOffloadingGroupStartFunction#
Bases:
torch.autograd.FunctionIdentity operation that marks the start of a layer group for offload/reload. Prepares for offload during forward and triggers reload during backward.
- static forward(ctx, tensor, cpu_offload_handler, name)#
- static backward(ctx, grad_output)#
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_group_start(tensor, name=None)#
Mark the start of a layer group and prepare for offload/reload.
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_forward_record(
- event: torch.cuda.Event,
Record the forward event for cuda graph capture.
- class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedOffloadingBackwardRecordFunction#
Bases:
torch.autograd.FunctionIdentity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward.
- static forward(ctx, tensor, event: torch.cuda.Event) torch.Tensor#
Forward pass for cuda graph capture.
- static backward(ctx, grad_output)#
Record the backward event and wait for the h2d stream on cuda graph stream.
- core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_backward_record(
- tensor,
- event: torch.cuda.Event,
Record the backward event for cuda graph capture.
- class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedActivationOffloadingInterface(
- offload: bool,
- tensor: torch.Tensor,
- name: str,
Interface for fine-grained activation offloading.
Initialization
- __enter__()#
Enter context manager to enable activation offloading hooks.
- __exit__(*args: Any)#
Exit context manager to disable activation offloading hooks.
- static init_chunk_handler(vp_size, vp_stage, min_offloaded_tensor_size)#
Initialize the chunk handler, called at the start of a microbatch forward pass.
- static get_context(flag)#
Get the fine-grained offload context
- static group_commit(
- tensor,
- name,
- forced_released_tensors=None,
- delay_offload=False,
Group commit the tensors.
- static mark_not_offloadable(tensor: torch.Tensor)#
Mark the tensor as not offloadable.
- static forward_record(event: torch.cuda.Event) None#
Record the forward event for cuda graph capture.
- static reset()#
Reset the chunk handler.
- static reset_instance()#
Reset the singleton instance.