core.datasets.indexed_dataset#

Module Contents#

Classes#

DType

The NumPy data type Enum for writing/reading the IndexedDataset indices

_IndexWriter

Object class to write the index (.idx) file

_IndexReader

Object class to read the index (.idx) file

_BinReader

Abstract class to read the data (.bin) file

_MMapBinReader

A _BinReader that memory maps the data (.bin) file

_FileBinReader

A _BinReader that reads from the data (.bin) file using a file pointer

_S3BinReader

A _BinReader that reads from the data (.bin) file from S3

_MultiStorageClientBinReader

A _BinReader that reads from the data (.bin) file using the multi-storage client.

IndexedDataset

The low-level interface dataset class

IndexedDatasetBuilder

Builder class for the IndexedDataset class

Functions#

get_idx_path

Get the path to the index file from the prefix

get_bin_path

Get the path to the data file from the prefix

Data#

API#

core.datasets.indexed_dataset.logger#

‘getLogger(…)’

core.datasets.indexed_dataset._INDEX_HEADER#

b’MMIDIDX\x00\x00’

class core.datasets.indexed_dataset.DType(*args, **kwds)#

Bases: enum.Enum

The NumPy data type Enum for writing/reading the IndexedDataset indices

Initialization

uint8#

1

int8#

2

int16#

3

int32#

4

int64#

5

float64#

6

float32#

7

uint16#

8

classmethod code_from_dtype(value: Type[numpy.number]) int#

Get the code from the dtype

Parameters:

value (Type[numpy.number]) – The dtype

Returns:

The code

Return type:

int

classmethod dtype_from_code(value: int) Type[numpy.number]#

Get the dtype from the code

Parameters:

value (int) – The code

Returns:

The dtype

Return type:

Type[numpy.number]

static size(key: Union[int, Type[numpy.number]]) int#

Get the size of the dtype/code in bytes

Parameters:

key (Union[int, Type[numpy.number]]) – The dtype or code

Raises:

ValueError – If the key is neither dtype nor integer code

Returns:

The size of the dtype/code in in bytes

Return type:

int

static optimal_dtype(
cardinality: Optional[int],
) Type[numpy.number]#

Get the dtype to use for an index of a certain cardinality

Parameters:

cardinality (Optional[int]) – The number of elements to be indexed

Returns:

The dtype to use for the index

Return type:

Type[numpy.number]

class core.datasets.indexed_dataset._IndexWriter(idx_path: str, dtype: Type[numpy.number])#

Bases: object

Object class to write the index (.idx) file

Parameters:
  • idx_path (str) – The path to the index file

  • dtype (Type[numpy.number]) – The dtype of the index file

Initialization

__enter__() core.datasets.indexed_dataset._IndexWriter#

Enter the context introduced by the ‘with’ keyword

Returns:

The instance

Return type:

_IndexWriter

__exit__(
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[types.TracebackType],
) Optional[bool]#

Exit the context introduced by the ‘with’ keyword

Parameters:
  • exc_type (Optional[Type[BaseException]]) – Exception type

  • exc_val (Optional[BaseException]) – Exception value

  • exc_tb (Optional[TracebackType]) – Exception traceback object

Returns:

Whether to silence the exception

Return type:

Optional[bool]

write(
sequence_lengths: collections.abc.Iterable[Union[int, numpy.integer]],
sequence_modes: Optional[collections.abc.Iterable[Union[int, numpy.integer]]],
document_indices: collections.abc.Iterable[Union[int, numpy.integer]],
) None#

Write the index (.idx) file

Parameters:
  • sequence_lengths (List[int]) – The length of each sequence

  • sequence_modes (Optional[List[int]]) – The mode of each sequences

  • document_indices (List[int]) – The seqyebce indices demarcating the end of each document

_sequence_pointers(
sequence_lengths: collections.abc.Iterable[Union[int, numpy.integer]],
) List[int]#

Build the sequence pointers per the sequence lengths and dtype size

Parameters:

sequence_lengths (List[int]) – The length of each sequence

Returns:

The pointer to the beginning of each sequence

Return type:

List[int]

class core.datasets.indexed_dataset._IndexReader(idx_path: str, multimodal: bool)#

Bases: object

Object class to read the index (.idx) file

Parameters:
  • idx_path (str) – The path to the index file

  • multimodal (bool) – Whether the dataset is multimodal

Initialization

__del__() None#

Clean up the object

__len__() int#

Return the length of the dataset

Returns:

The length of the dataset

Return type:

int

__getitem__(
idx: int,
) Tuple[numpy.int32, numpy.int64, Optional[numpy.int8]]#

Return the pointer, length, and mode at the index

Parameters:

idx (int) – The index into the dataset

Returns:

The pointer, length and mode at the index

Return type:

Tuple[numpy.int32, numpy.int64, Optional[numpy.int8]]

class core.datasets.indexed_dataset._BinReader#

Bases: abc.ABC

Abstract class to read the data (.bin) file

abstractmethod read(
dtype: Type[numpy.number],
count: int,
offset: int,
) numpy.ndarray#

Read bytes into a numpy array.

Parameters:
  • dtype (Type[numpy.number]) – Data-type of the returned array.

  • count (int) – Number of items to read.

  • offset (int) – Start reading from this offset (in bytes).

Returns:

An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

Return type:

numpy.ndarray

class core.datasets.indexed_dataset._MMapBinReader(bin_path: str)#

Bases: core.datasets.indexed_dataset._BinReader

A _BinReader that memory maps the data (.bin) file

Parameters:

bin_path (str) – The path to the data (.bin) file.

Initialization

read(
dtype: Type[numpy.number],
count: int,
offset: int,
) numpy.ndarray#

Read bytes into a numpy array.

Parameters:
  • dtype (Type[numpy.number]) – Data-type of the returned array.

  • count (int) – Number of items to read.

  • offset (int) – Start reading from this offset (in bytes).

Returns:

An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

Return type:

numpy.ndarray

__del__() None#

Clean up the object.

class core.datasets.indexed_dataset._FileBinReader(bin_path: str)#

Bases: core.datasets.indexed_dataset._BinReader

A _BinReader that reads from the data (.bin) file using a file pointer

Parameters:

bin_path (str) – The path to the data (.bin) file.

Initialization

read(
dtype: Type[numpy.number],
count: int,
offset: int,
) numpy.ndarray#

Read bytes into a numpy array.

Parameters:
  • dtype (Type[numpy.number]) – Data-type of the returned array.

  • count (int) – Number of items to read.

  • offset (int) – Start reading from this offset (in bytes).

Returns:

An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

Return type:

numpy.ndarray

class core.datasets.indexed_dataset._S3BinReader(
bin_path: str,
object_storage_config: megatron.core.datasets.object_storage_utils.ObjectStorageConfig,
)#

Bases: core.datasets.indexed_dataset._BinReader

A _BinReader that reads from the data (.bin) file from S3

Parameters:
  • bin_path (str) – The path to the data (.bin) file.

  • bin_chunk_nbytes (int, optional) – If not None, then maintain an in-memory cache to speed up calls to the read method. Furthermore, on a cache miss, download this number of bytes to refresh the cache. Otherwise (None), do not maintain an in-memory cache. A class that inherits from _BinReader may not implement caching in which case it should assert that bin_chunk_nbytes is None at initialization.

Initialization

_extract_from_cache(offset: int, size: int) bytes#

Extract size bytes starting at offset bytes into the cache

read(
dtype: Type[numpy.number],
count: int,
offset: int,
) numpy.ndarray#

Read bytes into a numpy array.

Let size be the count * DType.size(dtype). If the requested span of bytes [offset, offset + size) is covered by the in-memory cache maintained by this class, then this function extracts the requested span from that cache and returns it. Otherwise, this function first refreshes the cache and then extracts the requested span from the refreshed cache and returns it.

The cache is refreshed based on offset and size. In particular, we divide all the bytes in an S3 object into blocks, where each block contains bin_chunk_nbytes bytes. We assign each block an index starting from 0. We take the block with index (offset // bin_chunk_nbytes) to refresh the cache. If this new block still does not cover the requested span, we extend it just enough to include offset + size.

Parameters:
  • dtype (Type[numpy.number]) – Data-type of the returned array.

  • count (int) – Number of items to read.

  • offset (int) – Start reading from this offset (in bytes).

Returns:

An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

Return type:

numpy.ndarray

__del__() None#

Clean up the object

class core.datasets.indexed_dataset._MultiStorageClientBinReader(
bin_path: str,
object_storage_config: megatron.core.datasets.object_storage_utils.ObjectStorageConfig,
)#

Bases: core.datasets.indexed_dataset._BinReader

A _BinReader that reads from the data (.bin) file using the multi-storage client.

Parameters:
  • bin_path (str) – The path to the data (.bin) file.

  • object_storage_config (ObjectStorageConfig) – The object storage config.

Initialization

read(
dtype: Type[numpy.number],
count: int,
offset: int,
) numpy.ndarray#
core.datasets.indexed_dataset.OBJECT_STORAGE_BIN_READERS#

None

class core.datasets.indexed_dataset.IndexedDataset(
path_prefix: str,
multimodal: bool = False,
mmap: bool = True,
object_storage_config: Optional[megatron.core.datasets.object_storage_utils.ObjectStorageConfig] = None,
s3_config: Optional[megatron.core.datasets.object_storage_utils.S3Config] = None,
)#

Bases: torch.utils.data.Dataset

The low-level interface dataset class

Parameters:
  • path_prefix (str) – The index (.idx) and data (.bin) prefix

  • multimodal (bool) – Whether the dataset is multimodal. Defaults to False.

  • mmap (bool) – Whether to mmap the .bin files. Defaults to True.

  • object_storage_config (Optional[ObjectStorageConfig]) – Supplied only for data stored on S3 or MSC. IndexedDataset downloads the index (.idx) file to object_storage_config.path_to_idx_cache and streams data from the data (.bin) file in object_storage_config.bin_chunk_nbytes blocks. Note that mmap must be disabled for S3 data loading. Defaults to None.

Initialization

initialize(
path_prefix: str,
multimodal: bool,
mmap: bool,
object_storage_config: Optional[megatron.core.datasets.object_storage_utils.ObjectStorageConfig],
) None#

Initialize the dataset

This method is called by IndexedDataset.init during object creation and by IndexedDataset.setstate during un-pickling

Parameters:
  • path_prefix (str) – The index (.idx) and data (.bin) prefix

  • multimodal (bool) – Whether the dataset is multimodal

  • mmap (bool) – Whether to mmap the .bin file

  • object_storage_config (Optional[ObjectStorageConfig]) – See IndexedDataset docstring for details.

__getstate__() Tuple[str, bool, bool, Optional[megatron.core.datasets.object_storage_utils.ObjectStorageConfig]]#

Get the state during pickling

Returns:

The state tuple

Return type:

Tuple[str, bool, bool, Optional[ObjectStorageConfig]]

__setstate__(
state: Tuple[str, bool, bool, Optional[megatron.core.datasets.object_storage_utils.ObjectStorageConfig]],
) None#

Set the state during un-pickling

Parameters:

state (Tuple[str, bool, bool, Optional[ObjectStorageConfig]]) – The state tuple

__del__() None#

Clean up the object

__len__() int#

Return the length of the dataset i.e. the number of sequences in the index

Returns:

The length of the dataset

Return type:

int

__getitem__(
idx: Union[int, numpy.integer, slice],
) Union[numpy.ndarray, Tuple[numpy.ndarray, numpy.number], List[numpy.ndarray], Tuple[List[numpy.ndarray], numpy.ndarray]]#

Return from the dataset

Parameters:

idx (Union[int, numpy.integer, slice]) – The index or index slice into the dataset

Raises:
  • ValueError – When the index slice is non-contiguous

  • TypeError – When the index is of an unexpected type

Returns:

Union[ numpy.ndarray, Tuple[numpy.ndarray, numpy.number], List[numpy.ndarray], Tuple[List[numpy.ndarray], numpy.ndarray], ]: The sequence tokens and modes at the index or index slice

get(
idx: int,
offset: int = 0,
length: Optional[int] = None,
) Union[numpy.ndarray, Tuple[numpy.ndarray, numpy.number]]#

Retrieve a single item from the dataset with the option to only return a portion of the item.

get(idx) is the same as [idx] but get() does not support slicing.

Parameters:
  • idx (Union[int, numpy.integer]) – The index into the dataset

  • offset (int) – The integer token offset in the sequence

  • length (int) – The number of tokens to grab from the sequence

Returns:

The sequence tokens and mode at the index

Return type:

Union[numpy.ndarray, Tuple[numpy.ndarray, numpy.number]]

property sequence_lengths: numpy.ndarray#

Get the sequence lengths

Returns:

The sequence lengths

Return type:

numpy.ndarray

property document_indices: numpy.ndarray#

Get the document indices

Returns:

The document indices

Return type:

numpy.ndarray

get_document_indices() numpy.ndarray#

Get the document indices

This method is slated for deprecation.

Returns:

The document indices

Return type:

numpy.ndarray

set_document_indices(document_indices: numpy.ndarray) None#

Set the document indices

This method is slated for deprecation.

Parameters:

document_indices (numpy.ndarray) – The document indices

property sequence_modes: numpy.ndarray#

Get the sequence modes

Returns:

The sequence modes

Return type:

numpy.ndarray

static exists(path_prefix: str) bool#

Return whether the IndexedDataset exists on disk at the prefix

Parameters:

path_prefix (str) – The prefix to the index (.idx) and data (.bin) files

Returns:

Whether the IndexedDataset exists on disk at the prefix

Return type:

bool

class core.datasets.indexed_dataset.IndexedDatasetBuilder(
bin_path: str,
dtype: Type[numpy.number] = numpy.int32,
multimodal: bool = False,
)#

Bases: object

Builder class for the IndexedDataset class

Parameters:
  • bin_path (str) – The path to the data (.bin) file

  • dtype (Type[numpy.number], optional) – The dtype of the index file. Defaults to numpy.int32.

  • multimodal (bool, optional) – Whether the dataset is multimodal. Defaults to False.

Initialization

add_item(tensor: torch.Tensor, mode: int = 0) None#

Add a single item to the dataset

Parameters:
  • tensor (torch.Tensor) – The item to add to the data file

  • mode (int, optional) – The mode for the item. Defaults to 0.

add_document(
tensor: torch.Tensor,
lengths: List[int],
modes: Optional[List[int]] = None,
) None#

Add an entire document to the dataset

Parameters:
  • tensor (torch.Tensor) – The document to add

  • lengths (List[int]) – The lengths of each item in the document

  • modes (Optional[List[int]], optional) – The modes for each item in the document. Defaults to None.

end_document() None#

Finalize the document, for use with IndexedDatasetBuilder.add_item

add_index(path_prefix: str) None#

Add an entire IndexedDataset to the dataset

Parameters:

path_prefix (str) – The index (.idx) and data (.bin) prefix

finalize(idx_path: str) None#

Clean up and write the index (.idx) file

Parameters:

idx_path (str) – The path to the index file

core.datasets.indexed_dataset.get_idx_path(path_prefix: str) str#

Get the path to the index file from the prefix

Parameters:

path_prefix (str) – The prefix

Returns:

The path to the index file

Return type:

str

core.datasets.indexed_dataset.get_bin_path(path_prefix: str) str#

Get the path to the data file from the prefix

Parameters:

path_prefix (str) – The prefix

Returns:

The path to the data file

Return type:

str