core.pipeline_parallel.fine_grained_activation_offload#

Module Contents#

Classes#

GPUTensorPool

GPU memory pool for efficient allocation and deallocation of tensors.

OffloadTensorGroup

A group of tensors to be offloaded together.

PipelineOffloadManager

Singleton manager for coordinating activation offloading across pipeline stages. Manages chunk handlers, synchronizes GPU-CPU transfers, and handles virtual pipeline parallelism.

ChunkOffloadHandler

Handles activation offloading and reloading for a single pipeline chunk (microbatch). Manages tensor groups, coordinates asynchronous GPU-CPU transfers, and handles synchronization.

FineGrainedOffloadingGroupCommitFunction

Identity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward.

FineGrainedOffloadingGroupStartFunction

Identity operation that marks the start of a layer group for offload/reload. Prepares for offload during forward and triggers reload during backward.

FineGrainedOffloadingBackwardRecordFunction

Identity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward.

FineGrainedActivationOffloadingInterface

Interface for fine-grained activation offloading.

Functions#

debug_rank

Print debug message for a specific rank when DEBUG is enabled.

print_offload_summary_table

Print an ASCII table summarizing offload bytes across all ranks.

fine_grained_offloading_disable_offload

Disable the offload.

fine_grained_offloading_enable_offload

Enable the offload.

fine_grained_offloading_group_commit

Specify the tensors to be released after offloading. forced_released_tensors is a list of tensors to be released after offloading. The tensors will be untyped_storage().resize_(0) after offloading. Note: specify the tensors only when they are not automatically released by torch gc.

fine_grained_offloading_group_flush_delayed_groups

Flush the delayed groups.

fine_grained_offloading_group_start

Mark the start of a layer group and prepare for offload/reload.

fine_grained_offloading_forward_record

Record the forward event for cuda graph capture.

fine_grained_offloading_backward_record

Record the backward event for cuda graph capture.

Data#

API#

core.pipeline_parallel.fine_grained_activation_offload.DEBUG#

False

core.pipeline_parallel.fine_grained_activation_offload.DEBUG_RANK#

0

core.pipeline_parallel.fine_grained_activation_offload.debug_rank(message)#

Print debug message for a specific rank when DEBUG is enabled.

core.pipeline_parallel.fine_grained_activation_offload.print_offload_summary_table(
total_offload_bytes: Dict[str, int],
)#

Print an ASCII table summarizing offload bytes across all ranks.

Gathers offload data from all ranks and prints a formatted table on rank 0, with rows representing ranks and columns representing groups.

Parameters:

total_offload_bytes – Dict mapping group names to offload bytes for this rank.

class core.pipeline_parallel.fine_grained_activation_offload.GPUTensorPool(device: str = 'cuda', pin_memory: bool = False)#

GPU memory pool for efficient allocation and deallocation of tensors.

Features:

  • Supports multiple tensor shapes and dtypes, each with its own pool

  • Dynamic allocation: tensors are created on-demand during allocation

  • Efficient reuse: freed tensors are returned to the pool for reuse

  • Uses queue-based management for O(1) allocation and deallocation

.. rubric:: Example

pool = GPUTensorPool(device=’cuda:0’) tensor = pool.allocate((128, 512), dtype=torch.float32)

… use tensor …#

pool.free(tensor, (128, 512), dtype=torch.float32)

Initialization

Initialize GPU tensor pool.

Parameters:
  • device – GPU device, default ‘cuda’

  • pin_memory – Whether to use pinned memory (mainly for CPU tensors)

_get_pool_key(shape: Tuple, dtype: torch.dtype) Tuple#

Generate a unique key for the pool based on shape and dtype.

static _calculate_memory_size(shape: Tuple, dtype: torch.dtype) int#

Calculate memory size in bytes.

allocate(
shape: Tuple,
dtype: torch.dtype = torch.float32,
) torch.Tensor#

Allocate a tensor with the specified shape and dtype.

Parameters:
  • shape – Shape of the tensor

  • dtype – Data type of the tensor, default torch.float32

Returns:

Allocated tensor

free(tensor: torch.Tensor)#

Return a tensor to the pool for reuse.

Parameters:

tensor – Tensor to free

Raises:

ValueError – If tensor doesn’t belong to this pool

get_pool_status(
shape: Tuple = None,
dtype: torch.dtype = None,
) Dict[str, Any]#

Get the status of the memory pool.

Parameters:
  • shape – If specified along with dtype, return status for that specific pool

  • dtype – Data type (required if shape is specified)

Returns:

Dictionary containing status information

reset()#

Reset the pool, marking all tensors as available.

clear()#

Clear the pool and release all GPU memory.

__del__()#

Destructor to ensure resources are released.

class core.pipeline_parallel.fine_grained_activation_offload.OffloadTensorGroup(name)#

A group of tensors to be offloaded together.

Initialization

push_tensor(tag, tensor)#

Push a tensor to the group.

pop_tensor(tag)#

Pop a tensor from the group.

record_offload_event(stream)#

Record the offload event.

wait_offload_event(stream)#

Wait for the offload event.

record_reload_event(stream)#

Record the reload event.

wait_reload_event(stream)#

Wait for the reload event.

update_offload_info(tensor)#

Update the offload information.

class core.pipeline_parallel.fine_grained_activation_offload.PipelineOffloadManager#

Singleton manager for coordinating activation offloading across pipeline stages. Manages chunk handlers, synchronizes GPU-CPU transfers, and handles virtual pipeline parallelism.

Initialization

Initialize the manager with queues and dedicated CUDA streams.

OFFLOAD_MGR#

None

classmethod get_instance()#

Get the singleton instance of PipelineOffloadManager.

classmethod reset_instance()#

Reset the singleton instance of PipelineOffloadManager.

property d2h_stream#

Get the device-to-host (GPU to CPU) transfer stream.

property h2d_stream#

Get the host-to-device (CPU to GPU) transfer stream.

property cpu_tensor_pool#

Get the shared CPU tensor pool.

push_offload_groups(group_hook, forced_released_tensors)#

Push the offload groups to the delayed queue.

flush_delayed_groups()#

Flush the delayed groups.

reset()#

Reset manager state for a new training iteration.

property offload_summary_bytes: Dict[str, int]#

Offload summary bytes per group collected after warmup.

property offload_summary_total_bytes: int#

Total offloaded bytes collected after warmup.

flush()#

Flush all staged chunks to the backward queue in reverse order.

disable_offload()#

Disable the offload.

enable_offload()#

Enable the offload.

post_warmup_callback()#

Callback after warmup.

push(handler)#

Add a chunk handler to the backward queue.

pop_backward_chunk(name=None)#

Get the next non-empty backward chunk containing the group with the given name.

front_backward_chunk(name=None)#

Get the first non-empty backward chunk containing the group with the given name.

init_model_chunk_offload_handler(
vp_size,
vp_stage,
min_offloaded_tensor_size=1024 * 1024,
)#

Initialize a chunk offload handler for a model chunk (microbatch).

Parameters:
  • vp_size – Virtual pipeline size

  • vp_stage – Virtual pipeline stage index (None means stage 0)

  • min_offloaded_tensor_size – Minimum tensor size (in elements) to offload

pop_forward_chunk(name=None)#

Get the next forward pass chunk handler.

cur_forward_chunk()#

Get the current forward pass chunk handler.

cur_backward_chunk()#

Get the current backward pass chunk handler.

mark_not_offloadable(tensor: torch.Tensor)#

Mark the current forward chunk as not offloadable.

__enter__()#

Enter context manager to enable activation offloading hooks.

__exit__(*args: Any)#

Exit context manager and restore original tensor saving behavior.

on_save_for_backward(tensor: torch.Tensor) Any#

Hook called when autograd saves a tensor for backward pass. Returns a tag to identify the tensor later.

on_get_saved_tensor(saved_state: Any) torch.Tensor#

Hook called when autograd retrieves a saved tensor during backward pass. Returns the actual tensor (potentially reloading from CPU).

class core.pipeline_parallel.fine_grained_activation_offload.ChunkOffloadHandler(min_offloaded_tensor_size, cpu_tensor_pool)#

Handles activation offloading and reloading for a single pipeline chunk (microbatch). Manages tensor groups, coordinates asynchronous GPU-CPU transfers, and handles synchronization.

Initialization

offload(src_tensor, pin_memory=True, use_cpu_pool=True)#

Offload.

reload(state, non_blocking=None)#

Reload.

reset()#

Reset the chunk offload handler.

find_group_with_name(name: str, start_index: int = 0)#

Find the group with the given name starting from the given index.

is_empty_chunk(name=None)#

Check if this chunk has no tensors to manage.

finish_all_groups(name=None) bool#

Finish all groups.

find_next_group(name=None)#

Find the next group with the given name.

tensor_push(tensor)#

Push tensor to the offload handler.

tensor_pop(tensor_tag)#

Pop tensor from the offload handler.

tensor_need_offloading_checker(tensor)#

Check if the tensor needs to be offloaded.

bulk_offload_group()#

offload a group of tensors recorded in tensor_push().

get_max_deduplicated_groups()#

Get the maximum number of deduplicated groups.

bulk_reload_group()#

Bulk reload group.

pre_reload_last_layer()#

Pre-reload the last layer of this chunk to hide reload latency.

should_bulk_offload()#

Determine if the current group should be offloaded.

bulk_offload(forced_released_tensors)#

Offload a group of tensors and optionally release their GPU memory.

on_group_commit_forward(forced_released_tensors)#

Called at the end of a layer group’s forward pass to trigger offloading.

bulk_reload()#

Reload the next group of tensors from CPU to GPU.

on_group_commit_backward(name)#

Called at the end of a layer group’s backward pass. Ensures correct chunk is active and synchronizes reloads.

on_group_start_forward(name)#

Called at the start of a layer group’s forward pass. Increments group index and prepares for offloading.

on_group_start_backward()#

Called at the start of a layer group’s backward pass. Triggers reloading of tensors from CPU.

core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_disable_offload()#

Disable the offload.

core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_enable_offload()#

Enable the offload.

class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedOffloadingGroupCommitFunction#

Bases: torch.autograd.Function

Identity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward.

static forward(
ctx,
tensor,
cur_forward_chunk,
name,
forced_released_tensors,
delay_offload,
)#
static backward(ctx, *grad_output)#
core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_group_commit(
tensor,
name,
forced_released_tensors=None,
delay_offload=False,
)#

Specify the tensors to be released after offloading. forced_released_tensors is a list of tensors to be released after offloading. The tensors will be untyped_storage().resize_(0) after offloading. Note: specify the tensors only when they are not automatically released by torch gc.

core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_group_flush_delayed_groups()#

Flush the delayed groups.

class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedOffloadingGroupStartFunction#

Bases: torch.autograd.Function

Identity operation that marks the start of a layer group for offload/reload. Prepares for offload during forward and triggers reload during backward.

static forward(ctx, tensor, cpu_offload_handler, name)#
static backward(ctx, grad_output)#
core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_group_start(tensor, name=None)#

Mark the start of a layer group and prepare for offload/reload.

core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_forward_record(
event: torch.cuda.Event,
) None#

Record the forward event for cuda graph capture.

class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedOffloadingBackwardRecordFunction#

Bases: torch.autograd.Function

Identity operation that marks the end of a layer group for offload synchronization. Triggers offload during forward and synchronizes reload during backward.

static forward(ctx, tensor, event: torch.cuda.Event) torch.Tensor#

Forward pass for cuda graph capture.

static backward(ctx, grad_output)#

Record the backward event and wait for the h2d stream on cuda graph stream.

core.pipeline_parallel.fine_grained_activation_offload.fine_grained_offloading_backward_record(
tensor,
event: torch.cuda.Event,
) torch.Tensor#

Record the backward event for cuda graph capture.

class core.pipeline_parallel.fine_grained_activation_offload.FineGrainedActivationOffloadingInterface(
offload: bool,
tensor: torch.Tensor,
name: str,
)#

Interface for fine-grained activation offloading.

Initialization

__enter__()#

Enter context manager to enable activation offloading hooks.

__exit__(*args: Any)#

Exit context manager to disable activation offloading hooks.

static init_chunk_handler(vp_size, vp_stage, min_offloaded_tensor_size)#

Initialize the chunk handler, called at the start of a microbatch forward pass.

static get_context(flag)#

Get the fine-grained offload context

static group_commit(
tensor,
name,
forced_released_tensors=None,
delay_offload=False,
)#

Group commit the tensors.

static mark_not_offloadable(tensor: torch.Tensor)#

Mark the tensor as not offloadable.

static forward_record(event: torch.cuda.Event) None#

Record the forward event for cuda graph capture.

static reset()#

Reset the chunk handler.

static reset_instance()#

Reset the singleton instance.