core.datasets.retro.utils#

Utilities for Retro preprocessing.

Module Contents#

Classes#

Block

Specific block arg type to mute mypy.

GPTToTextDataset

Dataset to convert GPT tokens to text.

BlockPathMap

Map an index to its containing block path.

Functions#

log_retro_rank_0

Log on rank 0.

retro_makedir

Make a directory, conditional on not being in validation mode.

extract_data_config

Extract data config from dataset.

get_num_chunks_per_sample

Compute seq_length // chunk_length.

get_blocks

Divide range [0, num_samples) to sequence of block ranges.

get_blocks_by_rank

Divide existing and missing blocks evenly across all ranks.

Data#

API#

core.datasets.retro.utils.logger#

‘getLogger(…)’

class core.datasets.retro.utils.Block#

Bases: typing.TypedDict

Specific block arg type to mute mypy.

Initialization

Initialize self. See help(type(self)) for accurate signature.

range: Tuple[int, int]#

None

path: str#

None

core.datasets.retro.utils.log_retro_rank_0(message: str) None#

Log on rank 0.

Parameters:

message (str) – Message to log.

core.datasets.retro.utils.retro_makedir(
config: megatron.core.datasets.retro.config.RetroPreprocessingConfig,
path: str,
) None#

Make a directory, conditional on not being in validation mode.

Parameters:
core.datasets.retro.utils.extract_data_config(
config: megatron.core.datasets.retro.config.RetroPreprocessingConfig,
) megatron.core.datasets.retro.query.multi_split_gpt_dataset.MultiSplitGPTDatasetConfig#

Extract data config from dataset.

Parameters:

config (RetroPreprocessingConfig) – Retro preprocessing config.

Returns:

The config object used to build the dataset.

core.datasets.retro.utils.get_num_chunks_per_sample(
sample_length: int,
chunk_length: int,
) int#

Compute seq_length // chunk_length.

Parameters:
  • sample_length (int) – Alias of sequence_length.

  • chunk_length (int) – Retro chunk length (e.g., 64).

Returns:

Number of chunks per sample (i.e., sequence_length / chunk_length).

class core.datasets.retro.utils.GPTToTextDataset(
gpt_dataset: megatron.core.datasets.retro.query.multi_split_gpt_dataset.MultiSplitGPTDataset,
gpt_tokenizer: Any,
)#

Bases: torch.utils.data.Dataset

Dataset to convert GPT tokens to text.

Parameters:
  • gpt_dataset (MultiSplitGPTDataset) – GPT dataset, which outputs GPT token samples.

  • gpt_tokenizer (Any) – GPT tokenizer.

Initialization

__len__() int#

Dataset length.

Returns:

Number of samples in the dataset.

__getitem__(idx: int) dict#

Get dataset sample.

Parameters:

idx (int) – Index of sample.

Returns:

A dict containing attribute ‘text’ of type string.

core.datasets.retro.utils.get_blocks(
dirname: str,
n_samples: int,
block_size: int,
validate: Optional[Callable] = None,
) types.SimpleNamespace#

Divide range [0, num_samples) to sequence of block ranges.

This is a core method within the concept of block processing. The idea is to divide a range (size n_samples) into a sequence of blocks. Each block corresponds to a file within ‘dirname’ with name ‘{start_idx}-{end_idx}.hdf5’. This method checks for the existence of these files, and returns two lists, one for existing blocks and one for missing blocks.

Parameters:
  • dirname (str) – Path to directory containing block files.

  • n_samples (int) – Ideal number of samples. The total number of saved block data is <=n_samples.

  • block_size (int) – Max number of samples per block file (e.g., 100000).

  • validate (Callable) – Method for validating each block file during load.

Returns:

existing blocks, and missing blocks. The total number of samples between the existing and missing blocks should equal n_samples above.

Return type:

A namespace consisting of 2 lists

core.datasets.retro.utils.get_blocks_by_rank(
dirname: str,
n_samples: int,
block_size: int,
validate: Optional[Callable] = None,
sample: Optional[float] = None,
process_group: Optional[torch.distributed.ProcessGroup] = None,
) types.SimpleNamespace#

Divide existing and missing blocks evenly across all ranks.

See ‘get_blocks()’ above for description. The returned lists of existing and missing blocks are split evenly across ranks via interleaving. This way, each rank has a roughly equal number of blocks to process for a downstream operation.

Parameters:
  • dirname (str) – Path to directory containing block files.

  • n_samples (int) – Ideal number of samples. The total number of saved block data is <=n_samples.

  • block_size (int) – Max number of samples per block file (e.g., 100000).

  • validate (Callable) – Method for validating each block file during load.

  • sample (Optional[float]) – If provided, sample a random subset of the blocks. Used for validating preprocessing correctness.

  • process_group (Optional[ProcessGroup]) – Process group for distributed operations. If None, uses data parallel group.

Returns:

existing blocks, and missing blocks. Each of these two lists is potentially a sub-sample of the total set of existing and missing blocks, depending on whether sampling is used. Additionally, the attributes n_existing_world and n_missing_world are the total number of existing and missing blocks, independent of samples. Therefore, (n_existing_world + n_missing_world) * block_size == n_samples.

Return type:

A namespace consisting of 2 lists

class core.datasets.retro.utils.BlockPathMap(block_paths: List[str], block_size: int)#

Map an index to its containing block path.

The common use for this class is to have a directory of files containing blocks of processed data, of uniform block size (e.g., 100k samples per file). Each file must follow a naming convention of ‘startIdx-endIdx.[ext]’, where ‘endIdx’ minus ‘startIdx’ must equal the block size, with the possible exception of the final block. Given an input index, this class maps the index to the containing block file.

Parameters:
  • block_paths (List[str]) – List of paths to saved block files.

  • block_size (int) – Max number of samples per block file (e.g., 100000).

Initialization

classmethod from_dir(dir: str, block_size: int, ext: str = 'hdf5') Any#

Get list of block files, and create map.

Parameters:
  • dir (str) – Path to directory containing saved block files.

  • block_size (int) – Max number of samples per block file (e.g., 100000).

  • ext (str) – Block file extension (e.g., ‘hdf5’).

Returns:

A mapping of sample index to block file path.

__str__() str#

Stringify the mapping.

Returns:

A string representation of this block path map.

__getitem__(idx: int) str#

Get block path from index.

Parameters:

idx (int) – Index of sample.

Returns:

The path to the block file containing the sample index.