core.pipeline_parallel.hybrid_cp_schedule#
Module Contents#
Classes#
This class provides the functionality to form groups of sub-samples such that all DPxCP ranks have a roughly balanced workload in the group. |
Functions#
Scheduler for Hybrid Context Parallel. |
API#
- class core.pipeline_parallel.hybrid_cp_schedule.BalancedCPScheduler(
- max_seq_len_per_rank: int,
- dp_cp_group: torch.distributed.ProcessGroup,
This class provides the functionality to form groups of sub-samples such that all DPxCP ranks have a roughly balanced workload in the group.
Initialization
- get_total_workload(
- seq_length: int,
- cp_size: Optional[int] = None,
seq_length: sequence length of a sub-sample cp_size: total number of CP ranks working on this sub-sample
Note: This function is used to estimate the relative workload intensity of a sub-sample. This is not meant to be an accurate flops calculator.
Returns: workload of a sub-sample
- gpus_needed(seq_len: int) int#
Calculates the number of GPUs needed for a given sequence length and max sequence length per CP rank. This is used to determine the CP size of a sub-sample.
The number is rounded up to the next power of 2 to match the available hybrid context parallel process group sizes.
- make_buckets_equal(
- sample_seqlens: List[Tuple[int, int]],
- compute_estimator: Callable[[int], float],
Makes as many buckets as unique CP sizes needed. This keeps sample IDs tethered to their sequence lengths throughout the bucketing process.
- next_hdp_group(
- sample_seqlens: List[Tuple[int, int]],
- compute_estimator: Callable[[int], float],
- total_gpus: int,
- delta: float = 0.05,
- strategy: str = 'dp',
- eps_bucket: float = 0.1,
Given a list of (sample_id, sequence_length) tuples, this function aims to assign sequences in a group such that all GPUs in the DPxCP group have a roughly balanced workload. Once each group is roughly balanced, we exit and return the group and the leftover sequences.
The function performs the following passes in order to form a balanced microbatch:
We create buckets of sequences that are roughly balanced. We try to create as many buckets as possible CP sizes.
Given a bucket has sequences available, we assign the sample a. To a new set of GPUs if there are enough free GPUs. b. To an existing set of GPUs with the lowest load.
We check if the group is balanced whenever we need to move onto a new CP size in the same set of GPUs.
We trim the group if removing the last added sequence helps improve balance.
If we run out of sequences to assign and there are empty GPUs, we redistribute work to empty GPUs by recursively increasing the CP size of a sample until no empty GPUs are left.
Returns (micro_batches, leftover_sample_seqlens, exec_times, sample_ids_per_gpu).
- get_groups_and_subsamples(sample_id_seqlens, config)#
This function recursively forms groups of sub-samples such that all DPxCP ranks have a roughly balanced workload in the group.
- core.pipeline_parallel.hybrid_cp_schedule.hybrid_context_parallel_forward_backward(
- forward_step_func,
- data_iterator,
- model,
- num_microbatches,
- input_tensor,
- output_tensor_grad,
- forward_data_store,
- config,
- collect_non_loss_data,
- first_val_step,
- forward_only,
- no_sync_func,
- total_num_tokens,
- check_first_val_step,
- model_type,
Scheduler for Hybrid Context Parallel.
This function performs the packed sample scheduling and determines
The number of microbatches to schedule for each CP rank
The number of groups each CP rank should execute
The number of sub-samples per group each CP rank should execute
A group is defined by a set of samples that can run across the CP domain without any barrier. There are many reasons why we may not be able to run endless samples within a single group. For example, if we have 8 GPUs, if GPU 0-5 are assigned a long sample that requires CP6, GPU 6-7 are assigned a short sample that requires CP2, The next sample which requires CP4 can be assigned GPU 4-7. But GPU 6-7 will finish first and get deadlocked if GPU 4-5 are not participating in the group.