core.utils#
Utility functions used throughout Megatron core
Module Contents#
Classes#
Global buffer to avoid dynamic memory allocations. Caller should ensure that buffers of the same name are not used concurrently. |
|
Global symmetric memory buffer used in inference. This buffer is used by mcore-inference’s low-latency NVLS all-gather and reduce-scatter collectives. |
|
A wrapper for tensors that enables caller functions to pass an indirect reference to callee functions. By wrapping the tensor, the caller’s direct reference is removed, allowing the tensor to be garbage collected once the callee unwraps and frees it. |
|
Autograd function to make a viewless tensor. |
|
This is an internal class, not for use outside this module |
|
This is an internal dataclass, not for use outside this module |
|
Singleton Class implementing per rank Straggler Detector |
Functions#
No-op decorator. |
|
A decorator that marks a function as experimental. Experimental functions may change quickly and do not guarantee backwards compatiblity. |
|
A decorator that marks a Class as experimental. Experimental Classes may change quickly and do not guarantee backwards compatiblity. |
|
Get pytorch version from version; if not available use pip’s. Use caching. |
|
Get TE version from version; if not available use pip’s. Use caching. |
|
Check if minimum version of |
|
Check if minimum version of |
|
Get Flash attention version from version; if not available use pip’s. Use caching. |
|
Check if minimum version of |
|
Get mamba version from version; if not available use pip’s. Use caching. |
|
Check if minimum version of |
|
Get causal_conv1d version from version; if not available use pip’s. Use caching. |
|
Check if minimum version of |
|
Checks whether |
|
Ensure that numerator is divisible by the denominator. |
|
Ensure that numerator is divisible by the denominator and return the division value. |
|
Print warning for deprecated |
|
Issue a deprecation warning if tp_group is None and return the default tp group. |
|
Get world size for a distributed group. |
|
Get rank for a distributed group. |
|
Calculate the global rank corresponding to the first local rank in the given process group. |
|
Get an attribute from a wrapped model. If return_model_obj is true, return the object that has the ‘attr’ attribute; otherwise, return the attribute directly. |
|
Returns model_type attribute |
|
Returns whether the model has the xattn_needed attribute |
|
Returns the config attribute, allowed to return None |
|
Make a viewless tensor. |
|
Entry-point for creating viewless tensors. |
|
Assert that a tensor is not a view (i.e., its ‘._base’ field is not set). |
|
Safely set tensor’s ‘.data’ field. |
|
Init method based on N(0, sigma). |
|
Init method based on N(0, sigma/sqrt(2*num_layers). |
|
If torch distributed is initialized, write log on only one rank |
|
Log on first rank in each pipeline stage |
|
Computes hashes of all parameters in model, all-gathers hashes across DP replicas, and then checks for equality between the locally-computed hashes and those of other ranks. |
|
Helper for instantiating a ShardedTensor where the |
|
Helper for instantiating a non-sharded ShardedTensor (replicated across TP and DP group). |
|
For DTensor gets full tensor if some ranks will not have a local copy |
|
Returns the local shard of the given tensor if it is a DTensor. |
|
Gets the data parallel group of the given tensor if it is a DTensor. |
|
Ensure grad_output is stored in a contiguous buffer. |
|
Helper for performing embedding wgrad GEMM’s during the pipeline drain phase, pipelines the AllGather and GEMM’s. |
|
Multi tensor op applier |
|
Computes l2 norm for a list of contiguous tensors works as a drop-in replacement for amp_C.multi_tensor_l2norm |
|
Works as a drop-in replacement for amp_C.multi_tensor_scale. |
|
Check if a module is a submodule of another module. |
|
Slice batch input along sequence dimension into multiple chunks, which are parallelized across GPUs in a context parallel group. |
|
Configure NVTX range profiling to be enabled or disabled. |
|
Get the path of a function. Assumes being called from nvtx_range_push/pop. |
|
Push NVTX range onto stack. If msg is not provided, use the calling function’s path. |
|
Pop NVTX range from stack. If msg is not provided, use the calling function’s path. |
|
Get the path of a function. |
|
Decorator to add NVTX range to a function. |
|
Unwrap_model to return the final model instance |
|
Concatenates |
|
Creates an asyncio loop if necessary and then returns the current asyncio loop. |
|
Decorator to be applied to every coroutine that runs in a separate task. |
|
Returns Mamba inference state config from the model if it is a hybrid model. |
|
Mark a function as deprecated. |
|
Mark a function or class as internal API (not for external use). |
|
Mark a function or class as experimental API. |
Data#
StragglerDetector: private module variable, not be directly accessed |
|
API#
- core.utils.logger#
‘getLogger(…)’
- core.utils._te_version#
None
- core.utils._fa_version#
None
- core.utils._mamba_ssm_version#
None
- core.utils._causal_conv1d_version#
None
- core.utils.null_decorator(*args, **kwargs)#
No-op decorator.
- exception core.utils.ExperimentalNotEnabledError#
Bases:
ExceptionRaised during calls to experimental code when ENABLE_EXPERIMENTAL not set.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- core.utils.experimental_fn(introduced_with_version: str)#
A decorator that marks a function as experimental. Experimental functions may change quickly and do not guarantee backwards compatiblity.
Experimental functions have a limited lifetime and should either be productionized or deprecated.
- Parameters:
introduced_with_version (str) – A version-like string of Mcore at time of introduction.
- Raises:
ExperimentalNotEnabledError – Error raised when experimental function was called without enabling the experimental flag.
- core.utils.experimental_cls(introduced_with_version: str)#
A decorator that marks a Class as experimental. Experimental Classes may change quickly and do not guarantee backwards compatiblity.
Experimental classes have a limited lifetime and should either be productionized or deprecated.
- Parameters:
introduced_with_version (str) – A version-like string of Mcore at time of introduction.
- Raises:
ExperimentalNotEnabledError – Error raised when experimental class was called without enabling the experimental flag.
- core.utils.get_torch_version()#
Get pytorch version from version; if not available use pip’s. Use caching.
- core.utils.get_te_version()#
Get TE version from version; if not available use pip’s. Use caching.
- core.utils.is_te_min_version(version, check_equality=True)#
Check if minimum version of
transformer-engineis installed.
- core.utils.is_torch_min_version(version, check_equality=True)#
Check if minimum version of
torchis installed.
- core.utils.get_fa_version()#
Get Flash attention version from version; if not available use pip’s. Use caching.
- core.utils.is_fa_min_version(version, check_equality=True)#
Check if minimum version of
flash-attnis installed.
- core.utils.get_mamba_version()#
Get mamba version from version; if not available use pip’s. Use caching.
- core.utils.is_mamba_min_version(version, check_equality=True)#
Check if minimum version of
mamba_ssmis installed.
- core.utils.get_causal_conv1d_version()#
Get causal_conv1d version from version; if not available use pip’s. Use caching.
- core.utils.is_causal_conv1d_min_version(version, check_equality=True)#
Check if minimum version of
causal_conv1dis installed.
- core.utils.check_mamba_sequence_packing_support() Tuple[bool, Optional[str]]#
Checks whether
causal_conv1dandmamba_ssmsupport sequence packing.
- core.utils.ensure_divisibility(numerator, denominator)#
Ensure that numerator is divisible by the denominator.
- core.utils.divide(numerator, denominator)#
Ensure that numerator is divisible by the denominator and return the division value.
- core.utils.deprecate_inference_params(inference_context, inference_params)#
Print warning for deprecated
inference_params.
- core.utils.get_tensor_model_parallel_group_if_none(
- tp_group,
- is_expert=False,
- check_initialized=True,
Issue a deprecation warning if tp_group is None and return the default tp group.
- core.utils.get_pg_size(group=None)#
Get world size for a distributed group.
- Parameters:
group – Process group to get world size for. If None, uses default group.
- Returns:
World size (1 if distributed not initialized or group is None, else group.size())
- Return type:
int
- core.utils.get_pg_rank(group=None)#
Get rank for a distributed group.
- Parameters:
group – Process group to get rank for. If None, uses default group.
- Returns:
Rank (0 if distributed not initialized or group is None, else group.rank())
- Return type:
int
- core.utils.get_pg_src_rank(group=None)#
Calculate the global rank corresponding to the first local rank in the given process group.
- Parameters:
group – Process group to query. If None or distributed is not initialized, returns 0.
- Returns:
The first (source) global rank in the group.
- Return type:
int
- core.utils.get_attr_wrapped_model(
- model,
- attr,
- allow_none=True,
- return_model_obj=False,
Get an attribute from a wrapped model. If return_model_obj is true, return the object that has the ‘attr’ attribute; otherwise, return the attribute directly.
- core.utils.get_model_type(model)#
Returns model_type attribute
- core.utils.get_model_xattn(model)#
Returns whether the model has the xattn_needed attribute
- core.utils.get_model_config(model)#
Returns the config attribute, allowed to return None
- class core.utils.GlobalMemoryBuffer#
Global buffer to avoid dynamic memory allocations. Caller should ensure that buffers of the same name are not used concurrently.
Initialization
- get_tensor(
- tensor_shape,
- dtype,
- name,
- mem_alloc_context: Optional[Callable] = None,
Returns (potentially) a sub-tensor from the self.buffer for the given shape.
- class core.utils.GlobalSymmetricMemoryBuffer(size_in_mb, process_group)#
Global symmetric memory buffer used in inference. This buffer is used by mcore-inference’s low-latency NVLS all-gather and reduce-scatter collectives.
Initialization
- _can_allocate(numel, dtype) bool#
Returns whether enough symmetric memory is available for the given tensor shape and dtype.
- _allocate(numel, dtype) torch.Tensor#
Allocates a sub-tensor from the self.symm_buffer for the given numel and dtype
- maybe_get_tensor(tensor_shape, dtype)#
Returns (potentially) a sub-tensor from the self.symm_buffer for the given shape. If enough symmetric memory is not available, returns None.
- core.utils._kernel_make_viewless_tensor(inp, requires_grad)#
Make a viewless tensor.
View tensors have the undesirable side-affect of retaining a reference to the originally-viewed tensor, even after manually setting the ‘.data’ field. This method creates a new tensor that links to the old tensor’s data, without linking the viewed tensor, referenced via the ‘._base’ field.
- class core.utils.WrappedTensor(tensor: torch.Tensor)#
A wrapper for tensors that enables caller functions to pass an indirect reference to callee functions. By wrapping the tensor, the caller’s direct reference is removed, allowing the tensor to be garbage collected once the callee unwraps and frees it.
Initialization
- unwrap()#
Returns the wrapped tensor while deleting the internal reference. Can only be called once.
- class core.utils.MakeViewlessTensor#
Bases:
torch.autograd.FunctionAutograd function to make a viewless tensor.
This function should be used in cases where the computation graph needs to be propagated, but we only want a viewless tensor (e.g., ParallelTransformer’s hidden_states). Call this function by passing ‘keep_graph = True’ to ‘make_viewless_tensor()’.
- static forward(ctx, inp, requires_grad)#
Runs the fwd pass of _kernel_make_viewless_tensor
- static backward(ctx, grad_output)#
No-op
- core.utils.make_viewless_tensor(inp, requires_grad, keep_graph)#
Entry-point for creating viewless tensors.
This method should be used, rather than calling ‘MakeViewlessTensor’ or ‘_kernel_make_viewless_tensor’ directly. This method acts as a switch for determining if an autograd function or a regular method should be used to create the tensor.
- core.utils.assert_viewless_tensor(tensor, extra_msg=None)#
Assert that a tensor is not a view (i.e., its ‘._base’ field is not set).
- core.utils.safely_set_viewless_tensor_data(tensor, new_data_tensor)#
Safely set tensor’s ‘.data’ field.
Check first that the tensor is viewless (i.e., ‘._base’ not set). If not, raise an exception.
- core.utils.init_method_normal(sigma)#
Init method based on N(0, sigma).
- core.utils.scaled_init_method_normal(sigma, num_layers, multiplier=2.0)#
Init method based on N(0, sigma/sqrt(2*num_layers).
- core.utils.log_single_rank(
- logger: logging.Logger,
- *args: Any,
- rank: int = 0,
- **kwargs: Any,
If torch distributed is initialized, write log on only one rank
- Parameters:
logger (logging.Logger) – The logger to write the logs
args (Tuple[Any]) – All logging.Logger.log positional arguments
rank (int, optional) – The rank to write on. Defaults to 0.
kwargs (Dict[str, Any]) – All logging.Logger.log keyword arguments
- core.utils.log_on_each_pipeline_stage(
- logger: logging.Logger,
- *args: Any,
- tp_group: Optional[torch.distributed.ProcessGroup] = None,
- dp_cp_group: Optional[torch.distributed.ProcessGroup] = None,
- **kwargs: Any,
Log on first rank in each pipeline stage
- Parameters:
logger (logging.Logger) – The logger to write the logs
args (Tuple[Any]) – All logging.Logger.log positional arguments
kwargs (Dict[str, Any]) – All logging.Logger.log keyword arguments
- core.utils.check_param_hashes_across_dp_replicas(
- model: List[torch.nn.Module],
- cross_check: bool = False,
Computes hashes of all parameters in model, all-gathers hashes across DP replicas, and then checks for equality between the locally-computed hashes and those of other ranks.
NOTE: This function computes SHA-1 hashes on the CPU and thus needs to move all param tensors from GPU to CPU first; as a result, this function is not intended to be called very frequently in the main training loop.
- Parameters:
model (List[torch.nn.Module]) – List of model chunks whose parameter hashes need to be checked.
cross_check (bool) – If true, will check whether hashes match across all DP replicas.
- Returns:
True if all param hashes match with corresponding hash on DP replica 0 or across all replicas if cross_check is enabled, False otherwise.
- core.utils.make_tp_sharded_tensor_for_checkpoint(
- tensor,
- key,
- tp_axis=0,
- replica_id=None,
- prepend_offsets=(),
- **kwargs,
Helper for instantiating a ShardedTensor where the
tp_axisdimension is sharded across TP group.Optionally, can provide offsets which prepend new dimensions to the tensor.
- Parameters:
tensor – Tensor to shard
key – Key for the sharded tensor
tp_axis – Axis to shard across tensor parallel group (default: 0)
replica_id – Replica ID for the tensor (default: None)
prepend_offsets – Offsets to prepend to tensor dimensions (default: ())
**kwargs –
Additional arguments. May include:
tp_group: Tensor parallel group (default: None, falls back to parallel_state)
dp_cp_group: Data parallel + context parallel group (default: None, falls back to parallel_state)
- core.utils.make_sharded_tensor_for_checkpoint(
- tensor,
- key,
- prepend_offsets=(),
- replica_id=None,
- **kwargs,
Helper for instantiating a non-sharded ShardedTensor (replicated across TP and DP group).
Optionally, can provide offsets which prepend new dimensions to the tensor.
- Keyword Arguments:
tensor – Tensor to create sharded tensor for
key – Key for the sharded tensor
prepend_offsets – Offsets to prepend to tensor dimensions (default: ())
replica_id – Replica ID for the tensor (default: None)
**kwargs –
Additional arguments. May include:
tp_group: Tensor parallel group (default: None, falls back to parallel_state)
dp_cp_group: Data parallel + context parallel group (default: None, falls back to parallel_state)
- core.utils.get_full_tensor_if_necessary(tensor)#
For DTensor gets full tensor if some ranks will not have a local copy
- core.utils.to_local_if_dtensor(
- tensor: Union[torch.Tensor, torch.distributed._tensor.DTensor],
Returns the local shard of the given tensor if it is a DTensor.
- core.utils.get_data_parallel_group_if_dtensor(
- tensor: Union[torch.Tensor, torch.distributed._tensor.DTensor],
- data_parallel_group: ProcessGroup = None,
Gets the data parallel group of the given tensor if it is a DTensor.
- core.utils.prepare_input_tensors_for_wgrad_compute(
- grad_output,
- all_gathered_input,
Ensure grad_output is stored in a contiguous buffer.
- core.utils.drain_embedding_wgrad_compute(
- config,
- embedding_activation_buffer,
- grad_output_buffer,
- weight,
- tp_group,
Helper for performing embedding wgrad GEMM’s during the pipeline drain phase, pipelines the AllGather and GEMM’s.
Should only be used when pipeline model parallelism and gradient accumulation fusion are enabled.
- core.utils.local_multi_tensor_applier(op, noop_flag_buffer, tensor_lists, *args)#
Multi tensor op applier
- core.utils.local_multi_tensor_l2_norm(
- chunk_size,
- noop_flag,
- tensor_lists,
- per_tensor,
- *args,
Computes l2 norm for a list of contiguous tensors works as a drop-in replacement for amp_C.multi_tensor_l2norm
- core.utils.local_multi_tensor_scale(chunk_size, noop_flag, tensor_lists, scale)#
Works as a drop-in replacement for amp_C.multi_tensor_scale.
- class core.utils._ValueWithRank(value: float, rank: int, unit: str = '')#
This is an internal class, not for use outside this module
.. attribute:: _rank
rank for the value
- Type:
int
.. attribute:: _value
the value it stores, eg elapsed time
- Type:
float
.. attribute:: _unit
unit for the value
- Type:
str
Initialization
Initializer
- Parameters:
_value (float) – the initial value with which it is inited
_rank (int) – the rank number
_unit (str) – the unit of the value, eg ms or flops
- __lt__(other) bool#
Check if value of self is smaller than other’s value
- Parameters:
other (_ValueWithRank) – The other object to compare with
- Returns:
True if lhs._value of operand is less than rhs._value, else False
- Return type:
bool
- __gt__(other) bool#
Check if value of self is larger than other’s value
- Parameters:
other (_ValueWithRank) – The other object to compare with
- Returns:
True if lhs._value of operand is greater than rhs._value, else False
- Return type:
bool
- __call__() Tuple[float, int, str]#
Returns the value, the rank, and unit as a Tuple
- Returns:
value, rank, unit
- Return type:
Tuple[float, int, str]
- __str__() str#
String representation of the object
- Returns:
strigified object
- Return type:
str
- class core.utils._StragglerData#
This is an internal dataclass, not for use outside this module
.. attribute:: min_elapsed
- Type:
.. attribute:: max_elapsed
- Type:
.. attribute:: min_btime
- Type:
.. attribute:: max_btime
- Type:
.. attribute:: min_temp
min gpu temp across all ranks
- Type:
.. attribute:: max_temp
max gpu temp across all ranks
- Type:
.. attribute:: min_power
- Type:
.. attribute:: max_power
- Type:
.. attribute:: min_util
min gpu util across all ranks
- Type:
.. attribute:: max_util
max gpu util across all ranks
- Type:
.. attribute:: min_clock
min gpu clock across all ranks
- Type:
.. attribute:: max_clock
- Type:
.. attribute:: aflops
sorted array of (_ValueWithRank)
- Type:
List[_ValueWithRank]
- min_elapsed#
‘_ValueWithRank(…)’
- max_elapsed#
‘_ValueWithRank(…)’
- min_btime#
‘_ValueWithRank(…)’
- max_btime#
‘_ValueWithRank(…)’
- min_temp#
‘_ValueWithRank(…)’
- max_temp#
‘_ValueWithRank(…)’
- min_power#
‘_ValueWithRank(…)’
- max_power#
‘_ValueWithRank(…)’
- min_util#
‘_ValueWithRank(…)’
- max_util#
‘_ValueWithRank(…)’
- min_clock#
‘_ValueWithRank(…)’
- max_clock#
‘_ValueWithRank(…)’
- aflops: Union[List[core.utils._ValueWithRank], None]#
None
- class core.utils.StragglerDetector#
Singleton Class implementing per rank Straggler Detector
It use cuda events to time operation of choice using the start and stop methods which can be directly invoked using the class instance or can be used like a python context. After collection, a report() method is available to display the collected metrics. It is only supported if CUDA is available. megatron/core/README_STRAGGLER.md for more info
.. note::
The instance and class attributes mentioned below are all private to the class and has no use outside the class
.. attribute:: _off
current state of the toggle
- Type:
bool
.. attribute:: start
start method
- Type:
FunctionType
.. attribute:: stop
stop method
- Type:
FunctionType
.. attribute:: world
world size
- Type:
int
.. attribute:: rank
rank for this instance
- Type:
int
.. attribute:: mmcnt
number of ranks to report
- Type:
int
.. attribute:: port
control port
- Type:
int
.. attribute:: amp
amplification factor for TFLOPs, default 3.0
- Type:
float
.. attribute:: toggle
whether to start/stop detector collection
- Type:
bool
.. attribute:: bdata
when true, just collect get_batch
- Type:
bool
.. attribute:: dev
cuda device
- Type:
int
.. attribute:: evt_q
cuda event queue
- Type:
LifoQueue
.. attribute:: start_gemm_ev
cuda start event
- Type:
list[torch.cuda.Event]
.. attribute:: stop_gemm_ev
cuda stop event
- Type:
list[torch.cuda.Event]
.. attribute:: start_data_ev
cuda start event
- Type:
list[torch.cuda.Event]
.. attribute:: stop_data_ev
cuda stop event
- Type:
list[torch.cuda.Event]
.. attribute:: start_gemm_tm
start time (wallclock)
- Type:
list[int]
.. attribute:: stop_gemm_tm
stop time (wallclock)
- Type:
list[int]
.. attribute:: start_data_tm
start time for get_batch
- Type:
list[int]
.. attribute:: stop_data_tm
stop time for get_batch
- Type:
list[int]
.. attribute:: sock
the controller socket
- Type:
socket
.. attribute:: ctrlr
the controller thread
- Type:
Thread
Initialization
Initializer
The inital state of the StragglerDetector instance is disabled. The enabled state is indicated using self._off member variable and the proerty enabled.
- _configured#
False
Indicates if the singleton instance is configured or not
- __new__() core.utils.StragglerDetector#
Constructor Creates an instance of the class if not created
- Parameters:
cls (Type['StragglerDetector']) – The class type
- Returns:
the class instance
- Return type:
- configure(
- world: int,
- rank: int,
- mmcnt: int = 1,
- amp: float = 3.0,
- port: int = 65535,
- prefill: int = 1024,
- enabled: bool = False,
This method is called to configure the Singleton instance
It should be called once per instantiation per process.
.. note::
The constructor keeps the state of instance disabled i.e no collection will happen even when start/stop methods are called. Only when enabled is True (self._off is True), the start/stop method pointers get assigned the real collection methods, otherwise they are initialized with null_method
- Parameters:
world (int) – World Size
rank (int) – The rank of this trainer
mmcnt (int, optional) – Number of ranks to print for showing Min/Max Etpt. Defaults to 1.
amp (float, optional) – Set to 3.0 if we only use timers in fwd pass. Defaults to 3.0.
port (int, optional) – Control port, useful only for rank-0. Defaults to 65535.
prefill (int, optional) – How many Events to pre-populate. Defaults to 1024.
enabled (bool, optional) – Whether or not collection is enabled on startup. Defaults to False.
- reset() None#
This method is called to reset the metrics state of the instance
It is generally called from within elapsed() after extracting per rank metrics.
- start_method() None#
This method adds the start timers.
Both cuda event and perf_counter are added. If bdata is set to true from call, this method skips inserting cuda timer. This way it can be used to measure time spent on CPU - generally useful for timing get_batch()
- stop_method() None#
This method adds the stop timers.
Both cuda event and perf_counter are added. If bdata is set to true from call, this method skips inserting cuda timer. Also see start_method()
- elapsed() Tuple[float, float, int, int, int, int]#
This method is called from report(), or can be called directly
It is called to collect all the elapsed time since last reset(). It finally calls reset()
- Returns:
see below for returns delta : time spent in kernel batch_delta : time spent in get_batch temp : observed gpu temp power : observed gpu power util : observed gpu utilization clock : observed gpu clock
- Return type:
Tuple[float, float, int, int, int, int]
- report(total_flops: float = 0.0, log_interval: int = 0) bool#
Function to log the min/max metircs and the associated rank over a time period
It finds the slowest and fastest rank among all ranks. It should be called by all ranks, but only rank-0 prints the analysis At the end it checks, if the straggler detector should remain active or if it should be deactivated.
- Parameters:
total_flops (float, optional) – The theoretical flops over the period. Defaults to 0.0.
log_interval (int, optional) – The training interval over which reporting is called(ms) Defaults to 0.
- Returns:
True if reported, else False
- Return type:
bool
- _check_toggle() None#
Helper method to check if a request to toggle the collection state was made
It checks iof collection state toggle req was made via the server listening on rank-0 since last call to report(). Called by report(). Calling this method indirectly from report() is the only way to activate the change that is made via rank-0
- _handler() None#
Thread function for the controller.
It is a tcp-server that listens on a port. Uses HTTP protocol. If connected to it using curl, it indicates a toggle of the collection state. The actual toggling happens at the end of calling report() when _check_toggle() is called.
- _controller()#
Installs a controller listener that is used to toggle collection state.
Called from configure(). Ignored for all ranks other than rank-0
- _min_max(
- ptime: float,
- btime: float,
- temp: float,
- power: float,
- util: float,
- clock: float,
- flops: float,
Helper function to find the min/max values
- Parameters:
ptime (float) – avg per iteration gpu time
btime (float) – avg per iteration cpu time
temp (float) – gpu temp at the time of reporting
power (float) – gpu power at the time of reporting
util (float) – gpu util at the time of reporting
clock (float) – gpu clock at the time of reporting
flops (float) – estimated flops for the rank
- Returns:
It contains the min/max of few metrics and the corresponding rank it also has sorted list of all (flops, rank) sorted by flops (aflops) or returns None if collecton is disabled
- Return type:
Union[_StragglerData, None]
- property enabled: bool#
Can be called to check the enabled state of the instance
.. note::
After the request to toggle the state, the actual state change happens at end of call to report()
- property configured: bool#
Can be called to check if the instance is already configured
- Returns:
returns True if configure was called and was a success, else False
- Return type:
bool
- property my_rank#
Can be called to get configured rank of this instance
- Returns:
Configured rank for this instance
- Return type:
int
- property world_size: int#
Can be called to get configured world of this instance
- Returns:
World size configured for this instance
- Return type:
int
- null_method() None#
Default method to initialize start/stop method ptrs
- __enter__() core.utils.StragglerDetector#
Define context/instance entry
- Returns:
the instance
- Return type:
- __call__(bdata: bool = False) core.utils.StragglerDetector#
Callable for the instance. Set context state,
Useful when the context is used for cpu timers only when bdata=True
- Parameters:
bdata (bool, optional) – when true, only enables cpu timers. Defaults to False.
- Returns:
the instance
- Return type:
- __exit__(
- ex_type: Optional[Type[BaseException]],
- ex_val: Optional[BaseException],
- ex_tb: Optional[types.TracebackType],
Define context/instance exit, calls the stop method
- Parameters:
ex_type (Optional[Type[BaseException]]) – Exception type
ex_val (Optional[BaseException]) – description
ex_tb (Optional[TracebackType]) – description
- Returns:
True if the exception was handled
- Return type:
bool
- core.utils.__straggler__#
‘StragglerDetector(…)’
StragglerDetector: private module variable, not be directly accessed
- core.utils.is_submodule(module, parent_module, strict=True)#
Check if a module is a submodule of another module.
- core.utils.get_batch_on_this_cp_rank(batch: Dict[str, Any])#
Slice batch input along sequence dimension into multiple chunks, which are parallelized across GPUs in a context parallel group.
- core.utils._nvtx_enabled: bool#
False
- core.utils._nvtx_range_messages: list[str]#
[]
- core.utils.configure_nvtx_profiling(enabled: bool) None#
Configure NVTX range profiling to be enabled or disabled.
- Parameters:
enabled (bool) – Whether to enable NVTX range profiling
- core.utils._nvtx_range_get_func_path()#
Get the path of a function. Assumes being called from nvtx_range_push/pop.
- Returns:
Module path and function name joined by a dot
- Return type:
str
- core.utils.nvtx_range_push(msg=None, suffix=None) None#
Push NVTX range onto stack. If msg is not provided, use the calling function’s path.
- Parameters:
msg (str, optional) – Message to associate with range
suffix (str, optional) – Suffix to append to the message
- core.utils.nvtx_range_pop(msg=None, suffix=None) None#
Pop NVTX range from stack. If msg is not provided, use the calling function’s path.
- Parameters:
msg (str, optional) – Message to associate with range
suffix (str, optional) – Suffix to append to the message
- core.utils._nvtx_decorator_get_func_path(func)#
Get the path of a function.
- Parameters:
func (Callable) – Function to get path for.
- Returns:
Module path and function name joined by a dot
- Return type:
str
- core.utils.nvtx_decorator(
- message: Optional[str] = None,
- color: Optional[str] = None,
Decorator to add NVTX range to a function.
- Parameters:
message (str, optional) – Custom message for the NVTX range. If None, uses function path
color (str, optional) – Color for the NVTX range. Defaults to None
- Returns:
Decorated function with NVTX profiling if enabled
- Return type:
Callable
.. rubric:: Example
@nvtx_decorator() def my_function(): pass
@nvtx_decorator(message=”Custom Range”, color=”blue”) def another_function(): pass
- core.utils.unwrap_model(model, module_instances=None)#
Unwrap_model to return the final model instance
- core.utils.maybe_cat(a, b, dim=0, *, required=False)#
Concatenates
aandbalongdimifaandbexist.
- core.utils._ASYNC_IO_LOOP: asyncio.AbstractEventLoop | None#
None
- core.utils.get_asyncio_loop(
- loop: asyncio.AbstractEventLoop | None = None,
Creates an asyncio loop if necessary and then returns the current asyncio loop.
- core.utils._ASYNC_TASK_STATS#
‘defaultdict(…)’
- core.utils.trace_async_exceptions(
- func: Optional[Callable] = None,
- *,
- verbose: bool = False,
Decorator to be applied to every coroutine that runs in a separate task.
This is needed because asyncio tasks do not propagate exceptions. Coroutines running inside separate tasks will fail silently if not decorated.
Passing in
verbose=Truewill print additional lifetime logging information about the task. Such functionality is relied on by some users, and can be enabled as shown below:@trace_async_exceptions(verbose=True) async def my_coroutine(...): ...
- core.utils.get_mamba_inference_state_config_from_model(
- model,
Returns Mamba inference state config from the model if it is a hybrid model.
- core.utils.deprecated(
- version: str,
- removal_version: Optional[str] = None,
- alternative: Optional[str] = None,
- reason: Optional[str] = None,
Mark a function as deprecated.
This decorator:
Adds deprecation metadata to the function
Issues a DeprecationWarning when the function is called
Allows the compatibility checker to track deprecation lifecycle
- Parameters:
version – Version where deprecation starts (e.g., “1.0.0”)
removal_version – Version where function will be removed (e.g., “2.0.0”)
alternative – Name of the recommended replacement function
reason – Optional explanation for the deprecation
- Returns:
Decorator function
.. rubric:: Example
@deprecated( version=”1.0.0”, removal_version=”2.0.0”, alternative=”new_train_model”, reason=”Improved performance and cleaner API” ) def old_train_model(config): pass
- core.utils.internal_api(func: Callable) Callable#
Mark a function or class as internal API (not for external use).
Use this decorator for:
Internal APIs not intended for public consumption
Experimental features that may change without notice
Implementation details that are not part of the stable API
Objects marked with this decorator will be exempt from backward compatibility checks.
- Parameters:
func – The function or class to mark as internal
- Returns:
The original function/class with an internal API marker
.. rubric:: Example
@internal_api def _internal_helper(): ‘’’For internal use only’’’ pass
@internal_api class ExperimentalFeature: ‘’’This API may change without notice’’’ pass
- core.utils.experimental_api(func: Callable) Callable#
Mark a function or class as experimental API.
Use this decorator for:
Experimental features that may change without notice
New APIs under active development
Features that are not yet stable
Objects marked with this decorator will be exempt from backward compatibility checks, allowing rapid iteration during development.
- Parameters:
func – The function or class to mark as experimental
- Returns:
The original function/class with an experimental API marker
.. rubric:: Example
@experimental_api def new_experimental_feature(): ‘’’This API is experimental and may change’’’ pass
@experimental_api class ExperimentalModel: ‘’’This model is under active development’’’ pass