core.utils#

Utility functions used throughout Megatron core

Module Contents#

Classes#

GlobalMemoryBuffer

Global buffer to avoid dynamic memory allocations. Caller should ensure that buffers of the same name are not used concurrently.

GlobalSymmetricMemoryBuffer

Global symmetric memory buffer used in inference. This buffer is used by mcore-inference’s low-latency NVLS all-gather and reduce-scatter collectives.

WrappedTensor

A wrapper for tensors that enables caller functions to pass an indirect reference to callee functions. By wrapping the tensor, the caller’s direct reference is removed, allowing the tensor to be garbage collected once the callee unwraps and frees it.

MakeViewlessTensor

Autograd function to make a viewless tensor.

_ValueWithRank

This is an internal class, not for use outside this module

_StragglerData

This is an internal dataclass, not for use outside this module

StragglerDetector

Singleton Class implementing per rank Straggler Detector

Functions#

null_decorator

No-op decorator.

experimental_fn

A decorator that marks a function as experimental. Experimental functions may change quickly and do not guarantee backwards compatiblity.

experimental_cls

A decorator that marks a Class as experimental. Experimental Classes may change quickly and do not guarantee backwards compatiblity.

get_torch_version

Get pytorch version from version; if not available use pip’s. Use caching.

get_te_version

Get TE version from version; if not available use pip’s. Use caching.

is_te_min_version

Check if minimum version of transformer-engine is installed.

is_torch_min_version

Check if minimum version of torch is installed.

get_fa_version

Get Flash attention version from version; if not available use pip’s. Use caching.

is_fa_min_version

Check if minimum version of flash-attn is installed.

get_mamba_version

Get mamba version from version; if not available use pip’s. Use caching.

is_mamba_min_version

Check if minimum version of mamba_ssm is installed.

get_causal_conv1d_version

Get causal_conv1d version from version; if not available use pip’s. Use caching.

is_causal_conv1d_min_version

Check if minimum version of causal_conv1d is installed.

check_mamba_sequence_packing_support

Checks whether causal_conv1d and mamba_ssm support sequence packing.

ensure_divisibility

Ensure that numerator is divisible by the denominator.

divide

Ensure that numerator is divisible by the denominator and return the division value.

deprecate_inference_params

Print warning for deprecated inference_params.

get_tensor_model_parallel_group_if_none

Issue a deprecation warning if tp_group is None and return the default tp group.

get_pg_size

Get world size for a distributed group.

get_pg_rank

Get rank for a distributed group.

get_pg_src_rank

Calculate the global rank corresponding to the first local rank in the given process group.

get_attr_wrapped_model

Get an attribute from a wrapped model. If return_model_obj is true, return the object that has the ‘attr’ attribute; otherwise, return the attribute directly.

get_model_type

Returns model_type attribute

get_model_xattn

Returns whether the model has the xattn_needed attribute

get_model_config

Returns the config attribute, allowed to return None

_kernel_make_viewless_tensor

Make a viewless tensor.

make_viewless_tensor

Entry-point for creating viewless tensors.

assert_viewless_tensor

Assert that a tensor is not a view (i.e., its ‘._base’ field is not set).

safely_set_viewless_tensor_data

Safely set tensor’s ‘.data’ field.

init_method_normal

Init method based on N(0, sigma).

scaled_init_method_normal

Init method based on N(0, sigma/sqrt(2*num_layers).

log_single_rank

If torch distributed is initialized, write log on only one rank

log_on_each_pipeline_stage

Log on first rank in each pipeline stage

check_param_hashes_across_dp_replicas

Computes hashes of all parameters in model, all-gathers hashes across DP replicas, and then checks for equality between the locally-computed hashes and those of other ranks.

make_tp_sharded_tensor_for_checkpoint

Helper for instantiating a ShardedTensor where the tp_axis dimension is sharded across TP group.

make_sharded_tensor_for_checkpoint

Helper for instantiating a non-sharded ShardedTensor (replicated across TP and DP group).

get_full_tensor_if_necessary

For DTensor gets full tensor if some ranks will not have a local copy

to_local_if_dtensor

Returns the local shard of the given tensor if it is a DTensor.

get_data_parallel_group_if_dtensor

Gets the data parallel group of the given tensor if it is a DTensor.

prepare_input_tensors_for_wgrad_compute

Ensure grad_output is stored in a contiguous buffer.

drain_embedding_wgrad_compute

Helper for performing embedding wgrad GEMM’s during the pipeline drain phase, pipelines the AllGather and GEMM’s.

local_multi_tensor_applier

Multi tensor op applier

local_multi_tensor_l2_norm

Computes l2 norm for a list of contiguous tensors works as a drop-in replacement for amp_C.multi_tensor_l2norm

local_multi_tensor_scale

Works as a drop-in replacement for amp_C.multi_tensor_scale.

is_submodule

Check if a module is a submodule of another module.

get_batch_on_this_cp_rank

Slice batch input along sequence dimension into multiple chunks, which are parallelized across GPUs in a context parallel group.

configure_nvtx_profiling

Configure NVTX range profiling to be enabled or disabled.

_nvtx_range_get_func_path

Get the path of a function. Assumes being called from nvtx_range_push/pop.

nvtx_range_push

Push NVTX range onto stack. If msg is not provided, use the calling function’s path.

nvtx_range_pop

Pop NVTX range from stack. If msg is not provided, use the calling function’s path.

_nvtx_decorator_get_func_path

Get the path of a function.

nvtx_decorator

Decorator to add NVTX range to a function.

unwrap_model

Unwrap_model to return the final model instance

maybe_cat

Concatenates a and b along dim if a and b exist.

get_asyncio_loop

Creates an asyncio loop if necessary and then returns the current asyncio loop.

trace_async_exceptions

Decorator to be applied to every coroutine that runs in a separate task.

get_mamba_inference_state_config_from_model

Returns Mamba inference state config from the model if it is a hybrid model.

deprecated

Mark a function as deprecated.

internal_api

Mark a function or class as internal API (not for external use).

experimental_api

Mark a function or class as experimental API.

Data#

API#

core.utils.logger#

‘getLogger(…)’

core.utils._te_version#

None

core.utils._fa_version#

None

core.utils._mamba_ssm_version#

None

core.utils._causal_conv1d_version#

None

core.utils.null_decorator(*args, **kwargs)#

No-op decorator.

exception core.utils.ExperimentalNotEnabledError#

Bases: Exception

Raised during calls to experimental code when ENABLE_EXPERIMENTAL not set.

Initialization

Initialize self. See help(type(self)) for accurate signature.

core.utils.experimental_fn(introduced_with_version: str)#

A decorator that marks a function as experimental. Experimental functions may change quickly and do not guarantee backwards compatiblity.

Experimental functions have a limited lifetime and should either be productionized or deprecated.

Parameters:

introduced_with_version (str) – A version-like string of Mcore at time of introduction.

Raises:

ExperimentalNotEnabledError – Error raised when experimental function was called without enabling the experimental flag.

core.utils.experimental_cls(introduced_with_version: str)#

A decorator that marks a Class as experimental. Experimental Classes may change quickly and do not guarantee backwards compatiblity.

Experimental classes have a limited lifetime and should either be productionized or deprecated.

Parameters:

introduced_with_version (str) – A version-like string of Mcore at time of introduction.

Raises:

ExperimentalNotEnabledError – Error raised when experimental class was called without enabling the experimental flag.

core.utils.get_torch_version()#

Get pytorch version from version; if not available use pip’s. Use caching.

core.utils.get_te_version()#

Get TE version from version; if not available use pip’s. Use caching.

core.utils.is_te_min_version(version, check_equality=True)#

Check if minimum version of transformer-engine is installed.

core.utils.is_torch_min_version(version, check_equality=True)#

Check if minimum version of torch is installed.

core.utils.get_fa_version()#

Get Flash attention version from version; if not available use pip’s. Use caching.

core.utils.is_fa_min_version(version, check_equality=True)#

Check if minimum version of flash-attn is installed.

core.utils.get_mamba_version()#

Get mamba version from version; if not available use pip’s. Use caching.

core.utils.is_mamba_min_version(version, check_equality=True)#

Check if minimum version of mamba_ssm is installed.

core.utils.get_causal_conv1d_version()#

Get causal_conv1d version from version; if not available use pip’s. Use caching.

core.utils.is_causal_conv1d_min_version(version, check_equality=True)#

Check if minimum version of causal_conv1d is installed.

core.utils.check_mamba_sequence_packing_support() Tuple[bool, Optional[str]]#

Checks whether causal_conv1d and mamba_ssm support sequence packing.

core.utils.ensure_divisibility(numerator, denominator)#

Ensure that numerator is divisible by the denominator.

core.utils.divide(numerator, denominator)#

Ensure that numerator is divisible by the denominator and return the division value.

core.utils.deprecate_inference_params(inference_context, inference_params)#

Print warning for deprecated inference_params.

core.utils.get_tensor_model_parallel_group_if_none(
tp_group,
is_expert=False,
check_initialized=True,
)#

Issue a deprecation warning if tp_group is None and return the default tp group.

core.utils.get_pg_size(group=None)#

Get world size for a distributed group.

Parameters:

group – Process group to get world size for. If None, uses default group.

Returns:

World size (1 if distributed not initialized or group is None, else group.size())

Return type:

int

core.utils.get_pg_rank(group=None)#

Get rank for a distributed group.

Parameters:

group – Process group to get rank for. If None, uses default group.

Returns:

Rank (0 if distributed not initialized or group is None, else group.rank())

Return type:

int

core.utils.get_pg_src_rank(group=None)#

Calculate the global rank corresponding to the first local rank in the given process group.

Parameters:

group – Process group to query. If None or distributed is not initialized, returns 0.

Returns:

The first (source) global rank in the group.

Return type:

int

core.utils.get_attr_wrapped_model(
model,
attr,
allow_none=True,
return_model_obj=False,
)#

Get an attribute from a wrapped model. If return_model_obj is true, return the object that has the ‘attr’ attribute; otherwise, return the attribute directly.

core.utils.get_model_type(model)#

Returns model_type attribute

core.utils.get_model_xattn(model)#

Returns whether the model has the xattn_needed attribute

core.utils.get_model_config(model)#

Returns the config attribute, allowed to return None

class core.utils.GlobalMemoryBuffer#

Global buffer to avoid dynamic memory allocations. Caller should ensure that buffers of the same name are not used concurrently.

Initialization

get_tensor(
tensor_shape,
dtype,
name,
mem_alloc_context: Optional[Callable] = None,
)#

Returns (potentially) a sub-tensor from the self.buffer for the given shape.

class core.utils.GlobalSymmetricMemoryBuffer(size_in_mb, process_group)#

Global symmetric memory buffer used in inference. This buffer is used by mcore-inference’s low-latency NVLS all-gather and reduce-scatter collectives.

Initialization

_can_allocate(numel, dtype) bool#

Returns whether enough symmetric memory is available for the given tensor shape and dtype.

_allocate(numel, dtype) torch.Tensor#

Allocates a sub-tensor from the self.symm_buffer for the given numel and dtype

maybe_get_tensor(tensor_shape, dtype)#

Returns (potentially) a sub-tensor from the self.symm_buffer for the given shape. If enough symmetric memory is not available, returns None.

core.utils._kernel_make_viewless_tensor(inp, requires_grad)#

Make a viewless tensor.

View tensors have the undesirable side-affect of retaining a reference to the originally-viewed tensor, even after manually setting the ‘.data’ field. This method creates a new tensor that links to the old tensor’s data, without linking the viewed tensor, referenced via the ‘._base’ field.

class core.utils.WrappedTensor(tensor: torch.Tensor)#

A wrapper for tensors that enables caller functions to pass an indirect reference to callee functions. By wrapping the tensor, the caller’s direct reference is removed, allowing the tensor to be garbage collected once the callee unwraps and frees it.

Initialization

unwrap()#

Returns the wrapped tensor while deleting the internal reference. Can only be called once.

class core.utils.MakeViewlessTensor#

Bases: torch.autograd.Function

Autograd function to make a viewless tensor.

This function should be used in cases where the computation graph needs to be propagated, but we only want a viewless tensor (e.g., ParallelTransformer’s hidden_states). Call this function by passing ‘keep_graph = True’ to ‘make_viewless_tensor()’.

static forward(ctx, inp, requires_grad)#

Runs the fwd pass of _kernel_make_viewless_tensor

static backward(ctx, grad_output)#

No-op

core.utils.make_viewless_tensor(inp, requires_grad, keep_graph)#

Entry-point for creating viewless tensors.

This method should be used, rather than calling ‘MakeViewlessTensor’ or ‘_kernel_make_viewless_tensor’ directly. This method acts as a switch for determining if an autograd function or a regular method should be used to create the tensor.

core.utils.assert_viewless_tensor(tensor, extra_msg=None)#

Assert that a tensor is not a view (i.e., its ‘._base’ field is not set).

core.utils.safely_set_viewless_tensor_data(tensor, new_data_tensor)#

Safely set tensor’s ‘.data’ field.

Check first that the tensor is viewless (i.e., ‘._base’ not set). If not, raise an exception.

core.utils.init_method_normal(sigma)#

Init method based on N(0, sigma).

core.utils.scaled_init_method_normal(sigma, num_layers, multiplier=2.0)#

Init method based on N(0, sigma/sqrt(2*num_layers).

core.utils.log_single_rank(
logger: logging.Logger,
*args: Any,
rank: int = 0,
**kwargs: Any,
)#

If torch distributed is initialized, write log on only one rank

Parameters:
  • logger (logging.Logger) – The logger to write the logs

  • args (Tuple[Any]) – All logging.Logger.log positional arguments

  • rank (int, optional) – The rank to write on. Defaults to 0.

  • kwargs (Dict[str, Any]) – All logging.Logger.log keyword arguments

core.utils.log_on_each_pipeline_stage(
logger: logging.Logger,
*args: Any,
tp_group: Optional[torch.distributed.ProcessGroup] = None,
dp_cp_group: Optional[torch.distributed.ProcessGroup] = None,
**kwargs: Any,
)#

Log on first rank in each pipeline stage

Parameters:
  • logger (logging.Logger) – The logger to write the logs

  • args (Tuple[Any]) – All logging.Logger.log positional arguments

  • kwargs (Dict[str, Any]) – All logging.Logger.log keyword arguments

core.utils.check_param_hashes_across_dp_replicas(
model: List[torch.nn.Module],
cross_check: bool = False,
) bool#

Computes hashes of all parameters in model, all-gathers hashes across DP replicas, and then checks for equality between the locally-computed hashes and those of other ranks.

NOTE: This function computes SHA-1 hashes on the CPU and thus needs to move all param tensors from GPU to CPU first; as a result, this function is not intended to be called very frequently in the main training loop.

Parameters:
  • model (List[torch.nn.Module]) – List of model chunks whose parameter hashes need to be checked.

  • cross_check (bool) – If true, will check whether hashes match across all DP replicas.

Returns:

True if all param hashes match with corresponding hash on DP replica 0 or across all replicas if cross_check is enabled, False otherwise.

core.utils.make_tp_sharded_tensor_for_checkpoint(
tensor,
key,
tp_axis=0,
replica_id=None,
prepend_offsets=(),
**kwargs,
)#

Helper for instantiating a ShardedTensor where the tp_axis dimension is sharded across TP group.

Optionally, can provide offsets which prepend new dimensions to the tensor.

Parameters:
  • tensor – Tensor to shard

  • key – Key for the sharded tensor

  • tp_axis – Axis to shard across tensor parallel group (default: 0)

  • replica_id – Replica ID for the tensor (default: None)

  • prepend_offsets – Offsets to prepend to tensor dimensions (default: ())

  • **kwargs

    Additional arguments. May include:

    • tp_group: Tensor parallel group (default: None, falls back to parallel_state)

    • dp_cp_group: Data parallel + context parallel group (default: None, falls back to parallel_state)

core.utils.make_sharded_tensor_for_checkpoint(
tensor,
key,
prepend_offsets=(),
replica_id=None,
**kwargs,
)#

Helper for instantiating a non-sharded ShardedTensor (replicated across TP and DP group).

Optionally, can provide offsets which prepend new dimensions to the tensor.

Keyword Arguments:
  • tensor – Tensor to create sharded tensor for

  • key – Key for the sharded tensor

  • prepend_offsets – Offsets to prepend to tensor dimensions (default: ())

  • replica_id – Replica ID for the tensor (default: None)

  • **kwargs

    Additional arguments. May include:

    • tp_group: Tensor parallel group (default: None, falls back to parallel_state)

    • dp_cp_group: Data parallel + context parallel group (default: None, falls back to parallel_state)

core.utils.get_full_tensor_if_necessary(tensor)#

For DTensor gets full tensor if some ranks will not have a local copy

core.utils.to_local_if_dtensor(
tensor: Union[torch.Tensor, torch.distributed._tensor.DTensor],
) torch.Tensor#

Returns the local shard of the given tensor if it is a DTensor.

core.utils.get_data_parallel_group_if_dtensor(
tensor: Union[torch.Tensor, torch.distributed._tensor.DTensor],
data_parallel_group: ProcessGroup = None,
) Optional[ProcessGroup]#

Gets the data parallel group of the given tensor if it is a DTensor.

core.utils.prepare_input_tensors_for_wgrad_compute(
grad_output,
all_gathered_input,
)#

Ensure grad_output is stored in a contiguous buffer.

core.utils.drain_embedding_wgrad_compute(
config,
embedding_activation_buffer,
grad_output_buffer,
weight,
tp_group,
)#

Helper for performing embedding wgrad GEMM’s during the pipeline drain phase, pipelines the AllGather and GEMM’s.

Should only be used when pipeline model parallelism and gradient accumulation fusion are enabled.

core.utils.local_multi_tensor_applier(op, noop_flag_buffer, tensor_lists, *args)#

Multi tensor op applier

core.utils.local_multi_tensor_l2_norm(
chunk_size,
noop_flag,
tensor_lists,
per_tensor,
*args,
)#

Computes l2 norm for a list of contiguous tensors works as a drop-in replacement for amp_C.multi_tensor_l2norm

core.utils.local_multi_tensor_scale(chunk_size, noop_flag, tensor_lists, scale)#

Works as a drop-in replacement for amp_C.multi_tensor_scale.

class core.utils._ValueWithRank(value: float, rank: int, unit: str = '')#

This is an internal class, not for use outside this module

.. attribute:: _rank

rank for the value

Type:

int

.. attribute:: _value

the value it stores, eg elapsed time

Type:

float

.. attribute:: _unit

unit for the value

Type:

str

Initialization

Initializer

Parameters:
  • _value (float) – the initial value with which it is inited

  • _rank (int) – the rank number

  • _unit (str) – the unit of the value, eg ms or flops

__lt__(other) bool#

Check if value of self is smaller than other’s value

Parameters:

other (_ValueWithRank) – The other object to compare with

Returns:

True if lhs._value of operand is less than rhs._value, else False

Return type:

bool

__gt__(other) bool#

Check if value of self is larger than other’s value

Parameters:

other (_ValueWithRank) – The other object to compare with

Returns:

True if lhs._value of operand is greater than rhs._value, else False

Return type:

bool

__call__() Tuple[float, int, str]#

Returns the value, the rank, and unit as a Tuple

Returns:

value, rank, unit

Return type:

Tuple[float, int, str]

__str__() str#

String representation of the object

Returns:

strigified object

Return type:

str

class core.utils._StragglerData#

This is an internal dataclass, not for use outside this module

.. attribute:: min_elapsed

Type:

_ValueWithRank

.. attribute:: max_elapsed

Type:

_ValueWithRank

.. attribute:: min_btime

Type:

_ValueWithRank

.. attribute:: max_btime

Type:

_ValueWithRank

.. attribute:: min_temp

min gpu temp across all ranks

Type:

_ValueWithRank

.. attribute:: max_temp

max gpu temp across all ranks

Type:

_ValueWithRank

.. attribute:: min_power

Type:

_ValueWithRank

.. attribute:: max_power

Type:

_ValueWithRank

.. attribute:: min_util

min gpu util across all ranks

Type:

_ValueWithRank

.. attribute:: max_util

max gpu util across all ranks

Type:

_ValueWithRank

.. attribute:: min_clock

min gpu clock across all ranks

Type:

_ValueWithRank

.. attribute:: max_clock

Type:

_ValueWithRank

.. attribute:: aflops

sorted array of (_ValueWithRank)

Type:

List[_ValueWithRank]

min_elapsed#

‘_ValueWithRank(…)’

max_elapsed#

‘_ValueWithRank(…)’

min_btime#

‘_ValueWithRank(…)’

max_btime#

‘_ValueWithRank(…)’

min_temp#

‘_ValueWithRank(…)’

max_temp#

‘_ValueWithRank(…)’

min_power#

‘_ValueWithRank(…)’

max_power#

‘_ValueWithRank(…)’

min_util#

‘_ValueWithRank(…)’

max_util#

‘_ValueWithRank(…)’

min_clock#

‘_ValueWithRank(…)’

max_clock#

‘_ValueWithRank(…)’

aflops: Union[List[core.utils._ValueWithRank], None]#

None

class core.utils.StragglerDetector#

Singleton Class implementing per rank Straggler Detector

It use cuda events to time operation of choice using the start and stop methods which can be directly invoked using the class instance or can be used like a python context. After collection, a report() method is available to display the collected metrics. It is only supported if CUDA is available. megatron/core/README_STRAGGLER.md for more info

.. note::

The instance and class attributes mentioned below are all private to the class and has no use outside the class

.. attribute:: _off

current state of the toggle

Type:

bool

.. attribute:: start

start method

Type:

FunctionType

.. attribute:: stop

stop method

Type:

FunctionType

.. attribute:: world

world size

Type:

int

.. attribute:: rank

rank for this instance

Type:

int

.. attribute:: mmcnt

number of ranks to report

Type:

int

.. attribute:: port

control port

Type:

int

.. attribute:: amp

amplification factor for TFLOPs, default 3.0

Type:

float

.. attribute:: toggle

whether to start/stop detector collection

Type:

bool

.. attribute:: bdata

when true, just collect get_batch

Type:

bool

.. attribute:: dev

cuda device

Type:

int

.. attribute:: evt_q

cuda event queue

Type:

LifoQueue

.. attribute:: start_gemm_ev

cuda start event

Type:

list[torch.cuda.Event]

.. attribute:: stop_gemm_ev

cuda stop event

Type:

list[torch.cuda.Event]

.. attribute:: start_data_ev

cuda start event

Type:

list[torch.cuda.Event]

.. attribute:: stop_data_ev

cuda stop event

Type:

list[torch.cuda.Event]

.. attribute:: start_gemm_tm

start time (wallclock)

Type:

list[int]

.. attribute:: stop_gemm_tm

stop time (wallclock)

Type:

list[int]

.. attribute:: start_data_tm

start time for get_batch

Type:

list[int]

.. attribute:: stop_data_tm

stop time for get_batch

Type:

list[int]

.. attribute:: sock

the controller socket

Type:

socket

.. attribute:: ctrlr

the controller thread

Type:

Thread

Initialization

Initializer

The inital state of the StragglerDetector instance is disabled. The enabled state is indicated using self._off member variable and the proerty enabled.

_configured#

False

Indicates if the singleton instance is configured or not

__new__() core.utils.StragglerDetector#

Constructor Creates an instance of the class if not created

Parameters:

cls (Type['StragglerDetector']) – The class type

Returns:

the class instance

Return type:

StragglerDetector

configure(
world: int,
rank: int,
mmcnt: int = 1,
amp: float = 3.0,
port: int = 65535,
prefill: int = 1024,
enabled: bool = False,
) None#

This method is called to configure the Singleton instance

It should be called once per instantiation per process.

.. note::

The constructor keeps the state of instance disabled i.e no collection will happen even when start/stop methods are called. Only when enabled is True (self._off is True), the start/stop method pointers get assigned the real collection methods, otherwise they are initialized with null_method

Parameters:
  • world (int) – World Size

  • rank (int) – The rank of this trainer

  • mmcnt (int, optional) – Number of ranks to print for showing Min/Max Etpt. Defaults to 1.

  • amp (float, optional) – Set to 3.0 if we only use timers in fwd pass. Defaults to 3.0.

  • port (int, optional) – Control port, useful only for rank-0. Defaults to 65535.

  • prefill (int, optional) – How many Events to pre-populate. Defaults to 1024.

  • enabled (bool, optional) – Whether or not collection is enabled on startup. Defaults to False.

reset() None#

This method is called to reset the metrics state of the instance

It is generally called from within elapsed() after extracting per rank metrics.

start_method() None#

This method adds the start timers.

Both cuda event and perf_counter are added. If bdata is set to true from call, this method skips inserting cuda timer. This way it can be used to measure time spent on CPU - generally useful for timing get_batch()

stop_method() None#

This method adds the stop timers.

Both cuda event and perf_counter are added. If bdata is set to true from call, this method skips inserting cuda timer. Also see start_method()

elapsed() Tuple[float, float, int, int, int, int]#

This method is called from report(), or can be called directly

It is called to collect all the elapsed time since last reset(). It finally calls reset()

Returns:

see below for returns delta : time spent in kernel batch_delta : time spent in get_batch temp : observed gpu temp power : observed gpu power util : observed gpu utilization clock : observed gpu clock

Return type:

Tuple[float, float, int, int, int, int]

report(total_flops: float = 0.0, log_interval: int = 0) bool#

Function to log the min/max metircs and the associated rank over a time period

It finds the slowest and fastest rank among all ranks. It should be called by all ranks, but only rank-0 prints the analysis At the end it checks, if the straggler detector should remain active or if it should be deactivated.

Parameters:
  • total_flops (float, optional) – The theoretical flops over the period. Defaults to 0.0.

  • log_interval (int, optional) – The training interval over which reporting is called(ms) Defaults to 0.

Returns:

True if reported, else False

Return type:

bool

_check_toggle() None#

Helper method to check if a request to toggle the collection state was made

It checks iof collection state toggle req was made via the server listening on rank-0 since last call to report(). Called by report(). Calling this method indirectly from report() is the only way to activate the change that is made via rank-0

_handler() None#

Thread function for the controller.

It is a tcp-server that listens on a port. Uses HTTP protocol. If connected to it using curl, it indicates a toggle of the collection state. The actual toggling happens at the end of calling report() when _check_toggle() is called.

_controller()#

Installs a controller listener that is used to toggle collection state.

Called from configure(). Ignored for all ranks other than rank-0

_min_max(
ptime: float,
btime: float,
temp: float,
power: float,
util: float,
clock: float,
flops: float,
) Union[core.utils._StragglerData, None]#

Helper function to find the min/max values

Parameters:
  • ptime (float) – avg per iteration gpu time

  • btime (float) – avg per iteration cpu time

  • temp (float) – gpu temp at the time of reporting

  • power (float) – gpu power at the time of reporting

  • util (float) – gpu util at the time of reporting

  • clock (float) – gpu clock at the time of reporting

  • flops (float) – estimated flops for the rank

Returns:

It contains the min/max of few metrics and the corresponding rank it also has sorted list of all (flops, rank) sorted by flops (aflops) or returns None if collecton is disabled

Return type:

Union[_StragglerData, None]

property enabled: bool#

Can be called to check the enabled state of the instance

.. note::

After the request to toggle the state, the actual state change happens at end of call to report()

property configured: bool#

Can be called to check if the instance is already configured

Returns:

returns True if configure was called and was a success, else False

Return type:

bool

property my_rank#

Can be called to get configured rank of this instance

Returns:

Configured rank for this instance

Return type:

int

property world_size: int#

Can be called to get configured world of this instance

Returns:

World size configured for this instance

Return type:

int

null_method() None#

Default method to initialize start/stop method ptrs

__enter__() core.utils.StragglerDetector#

Define context/instance entry

Returns:

the instance

Return type:

StragglerDetector

__call__(bdata: bool = False) core.utils.StragglerDetector#

Callable for the instance. Set context state,

Useful when the context is used for cpu timers only when bdata=True

Parameters:

bdata (bool, optional) – when true, only enables cpu timers. Defaults to False.

Returns:

the instance

Return type:

StragglerDetector

__exit__(
ex_type: Optional[Type[BaseException]],
ex_val: Optional[BaseException],
ex_tb: Optional[types.TracebackType],
) bool#

Define context/instance exit, calls the stop method

Parameters:
  • ex_type (Optional[Type[BaseException]]) – Exception type

  • ex_val (Optional[BaseException]) – description

  • ex_tb (Optional[TracebackType]) – description

Returns:

True if the exception was handled

Return type:

bool

core.utils.__straggler__#

‘StragglerDetector(…)’

StragglerDetector: private module variable, not be directly accessed

core.utils.is_submodule(module, parent_module, strict=True)#

Check if a module is a submodule of another module.

core.utils.get_batch_on_this_cp_rank(batch: Dict[str, Any])#

Slice batch input along sequence dimension into multiple chunks, which are parallelized across GPUs in a context parallel group.

core.utils._nvtx_enabled: bool#

False

core.utils._nvtx_range_messages: list[str]#

[]

core.utils.configure_nvtx_profiling(enabled: bool) None#

Configure NVTX range profiling to be enabled or disabled.

Parameters:

enabled (bool) – Whether to enable NVTX range profiling

core.utils._nvtx_range_get_func_path()#

Get the path of a function. Assumes being called from nvtx_range_push/pop.

Returns:

Module path and function name joined by a dot

Return type:

str

core.utils.nvtx_range_push(msg=None, suffix=None) None#

Push NVTX range onto stack. If msg is not provided, use the calling function’s path.

Parameters:
  • msg (str, optional) – Message to associate with range

  • suffix (str, optional) – Suffix to append to the message

core.utils.nvtx_range_pop(msg=None, suffix=None) None#

Pop NVTX range from stack. If msg is not provided, use the calling function’s path.

Parameters:
  • msg (str, optional) – Message to associate with range

  • suffix (str, optional) – Suffix to append to the message

core.utils._nvtx_decorator_get_func_path(func)#

Get the path of a function.

Parameters:

func (Callable) – Function to get path for.

Returns:

Module path and function name joined by a dot

Return type:

str

core.utils.nvtx_decorator(
message: Optional[str] = None,
color: Optional[str] = None,
)#

Decorator to add NVTX range to a function.

Parameters:
  • message (str, optional) – Custom message for the NVTX range. If None, uses function path

  • color (str, optional) – Color for the NVTX range. Defaults to None

Returns:

Decorated function with NVTX profiling if enabled

Return type:

Callable

.. rubric:: Example

@nvtx_decorator() def my_function(): pass

@nvtx_decorator(message=”Custom Range”, color=”blue”) def another_function(): pass

core.utils.unwrap_model(model, module_instances=None)#

Unwrap_model to return the final model instance

core.utils.maybe_cat(a, b, dim=0, *, required=False)#

Concatenates a and b along dim if a and b exist.

core.utils._ASYNC_IO_LOOP: asyncio.AbstractEventLoop | None#

None

core.utils.get_asyncio_loop(
loop: asyncio.AbstractEventLoop | None = None,
) asyncio.AbstractEventLoop#

Creates an asyncio loop if necessary and then returns the current asyncio loop.

core.utils._ASYNC_TASK_STATS#

‘defaultdict(…)’

core.utils.trace_async_exceptions(
func: Optional[Callable] = None,
*,
verbose: bool = False,
)#

Decorator to be applied to every coroutine that runs in a separate task.

This is needed because asyncio tasks do not propagate exceptions. Coroutines running inside separate tasks will fail silently if not decorated.

Passing in verbose=True will print additional lifetime logging information about the task. Such functionality is relied on by some users, and can be enabled as shown below:

    @trace_async_exceptions(verbose=True)
    async def my_coroutine(...):
        ...
core.utils.get_mamba_inference_state_config_from_model(
model,
) Optional[megatron.core.inference.contexts.attention_context.mamba_metadata.MambaInferenceStateConfig]#

Returns Mamba inference state config from the model if it is a hybrid model.

core.utils.deprecated(
version: str,
removal_version: Optional[str] = None,
alternative: Optional[str] = None,
reason: Optional[str] = None,
) Callable#

Mark a function as deprecated.

This decorator:

  1. Adds deprecation metadata to the function

  2. Issues a DeprecationWarning when the function is called

  3. Allows the compatibility checker to track deprecation lifecycle

Parameters:
  • version – Version where deprecation starts (e.g., “1.0.0”)

  • removal_version – Version where function will be removed (e.g., “2.0.0”)

  • alternative – Name of the recommended replacement function

  • reason – Optional explanation for the deprecation

Returns:

Decorator function

.. rubric:: Example

@deprecated( version=”1.0.0”, removal_version=”2.0.0”, alternative=”new_train_model”, reason=”Improved performance and cleaner API” ) def old_train_model(config): pass

core.utils.internal_api(func: Callable) Callable#

Mark a function or class as internal API (not for external use).

Use this decorator for:

  • Internal APIs not intended for public consumption

  • Experimental features that may change without notice

  • Implementation details that are not part of the stable API

Objects marked with this decorator will be exempt from backward compatibility checks.

Parameters:

func – The function or class to mark as internal

Returns:

The original function/class with an internal API marker

.. rubric:: Example

@internal_api def _internal_helper(): ‘’’For internal use only’’’ pass

@internal_api class ExperimentalFeature: ‘’’This API may change without notice’’’ pass

core.utils.experimental_api(func: Callable) Callable#

Mark a function or class as experimental API.

Use this decorator for:

  • Experimental features that may change without notice

  • New APIs under active development

  • Features that are not yet stable

Objects marked with this decorator will be exempt from backward compatibility checks, allowing rapid iteration during development.

Parameters:

func – The function or class to mark as experimental

Returns:

The original function/class with an experimental API marker

.. rubric:: Example

@experimental_api def new_experimental_feature(): ‘’’This API is experimental and may change’’’ pass

@experimental_api class ExperimentalModel: ‘’’This model is under active development’’’ pass