nemo_automodel.components.datasets.llm.megatron.indexed_dataset

View as Markdown

A self-contained port of Megatron-Core’s indexed dataset loader.

Supports the original mmap and file-pointer readers for local *.bin / *.idx pairs, plus optional streaming readers for object storage (S3 and MSC).

All three calls below are equivalent for local data:

from nemo_automodel.datasets.llm.indexed_dataset import IndexedDataset

ds = IndexedDataset(“/path/to/shard_00_text_document”) print(len(ds), ds[0][:20])

ds = IndexedDataset(“/path/to/shard_00_text_document.bin”) print(len(ds), ds[0][:20])

ds = IndexedDataset(“/path/to/shard_00_text_document.idx”) print(len(ds), ds[0][:20])

For object-storage data, pass an :class:ObjectStorageConfig:

cfg = ObjectStorageConfig(path_to_idx_cache=“/tmp/idx_cache”) ds = IndexedDataset(“s3://bucket/path/shard_00_text_document”, object_storage_config=cfg)

Module Contents

Classes

NameDescription
DTypeThe NumPy data type Enum for reading the IndexedDataset indices
IndexedDatasetA fast, on-disk dataset backed by Megatron-style index + binary files.
IndexedDatasetBuilderBuilder class for the IndexedDataset class
ObjectStorageConfigConfiguration for reading .bin/.idx files from object storage.
_BinReaderAbstract class to read the data (.bin) file
_FileBinReaderA _BinReader that reads from the data (.bin) file using a file pointer
_IndexReaderObject class to read the index (.idx) file
_IndexWriterObject class to write the index (.idx) file
_MMapBinReaderA _BinReader that memory maps the data (.bin) file
_MultiStorageClientBinReaderRead .bin data via NVIDIA’s :mod:multi_storage_client.
_S3BinReaderStream .bin data from S3 via chunked ranged GetObject calls.

Functions

NameDescription
_cache_index_fileDownload .idx from object storage to local_path.
_get_index_cache_pathReturn the local cache path for idx_path under path_to_idx_cache.
_is_object_storage_pathReturn True if path is an s3:// or msc:// URI.
_normalize_prefix-
_parse_s3_pathSplit an s3://bucket/key URI into (bucket, key).
get_bin_pathReturn the binary-data path for a Megatron dataset prefix.
get_idx_pathReturn the index-file path for a Megatron dataset prefix.

Data

OBJECT_STORAGE_BIN_READERS

_INDEX_HEADER

_MSC_PREFIX

_S3_PREFIX

logger

API

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset.DType

Bases: enum.Enum

The NumPy data type Enum for reading the IndexedDataset indices

float32
= 7
float64
= 6
int16
= 3
int32
= 4
int64
= 5
int8
= 2
uint16
= 8
uint8
= 1
class nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDataset(
path_prefix: str,
multimodal: bool = False,
mmap: bool = True,
object_storage_config: typing.Optional[nemo_automodel.components.datasets.llm.megatron.indexed_dataset.ObjectStorageConfig] = None
)

Bases: Dataset

A fast, on-disk dataset backed by Megatron-style index + binary files.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDataset.__getitem__(
idx: typing.Union[int, numpy.integer, slice]
) -> typing.Union[numpy.ndarray, typing.Tuple[numpy.ndarray, typing.Any], typing.List[numpy.ndarray], typing.Tuple[typing.List[numpy.ndarray], numpy.ndarray]]
nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDataset.__len__() -> int
nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDataset.exists(
path_prefix: str
) -> bool
staticmethod
nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDataset.get(
idx: int,
offset: int = 0,
length: typing.Optional[int] = None
) -> typing.Union[numpy.ndarray, typing.Tuple[numpy.ndarray, typing.Any]]
nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDataset.initialize(
path_prefix: str,
multimodal: bool,
mmap: bool,
object_storage_config: typing.Optional[nemo_automodel.components.datasets.llm.megatron.indexed_dataset.ObjectStorageConfig] = None
) -> None
class nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDatasetBuilder(
bin_path: str,
dtype: typing.Type[numpy.number] = numpy.int32,
multimodal: bool = False
)

Builder class for the IndexedDataset class

Parameters:

bin_path
str

The path to the data (.bin) file

dtype
Type[numpy.number]Defaults to numpy.int32

The dtype of the index file. Defaults to numpy.int32.

multimodal
boolDefaults to False

Whether the dataset is multimodal. Defaults to False.

data_file
= open(bin_path, 'wb')
document_indices
= [0]
sequence_lengths
= []
sequence_modes
= [] if self.multimodal else None
nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDatasetBuilder.add_document(
tensor: torch.Tensor,
lengths: typing.List[int],
modes: typing.Optional[typing.List[int]] = None
) -> None

Add an entire document to the dataset

Parameters:

tensor
torch.Tensor

The document to add

lengths
List[int]

The lengths of each item in the document

modes
Optional[List[int]]Defaults to None

The modes for each item in the document. Defaults to None.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDatasetBuilder.add_index(
path_prefix: str
) -> None

Add an entire IndexedDataset to the dataset

Parameters:

path_prefix
str

The index (.idx) and data (.bin) prefix

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDatasetBuilder.add_item(
tensor: torch.Tensor,
mode: int = 0
) -> None

Add a single item to the dataset

Parameters:

tensor
torch.Tensor

The item to add to the data file

mode
intDefaults to 0

The mode for the item. Defaults to 0.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDatasetBuilder.end_document() -> None

Finalize the document, for use with IndexedDatasetBuilder.add_item

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.IndexedDatasetBuilder.finalize(
idx_path: str
) -> None

Clean up and write the index (.idx) file

Parameters:

idx_path
str

The path to the index file

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset.ObjectStorageConfig(
path_to_idx_cache: str,
bin_chunk_nbytes: int = 256 * 1024 * 1024
)
Dataclass

Configuration for reading .bin/.idx files from object storage.

bin_chunk_nbytes
int = 256 * 1024 * 1024
path_to_idx_cache
str
class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._BinReader()
Abstract

Abstract class to read the data (.bin) file

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._BinReader.read(
dtype: typing.Type[numpy.number],
count: int,
offset: int
) -> numpy.ndarray
abstract

Read bytes into a numpy array.

Parameters:

dtype
Type[numpy.number]

Data-type of the returned array.

count
int

Number of items to read.

offset
int

Start reading from this offset (in bytes).

Returns: numpy.ndarray

numpy.ndarray: An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._FileBinReader(
bin_path: str
)

Bases: _BinReader

A _BinReader that reads from the data (.bin) file using a file pointer

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._FileBinReader.read(
dtype: typing.Type[numpy.number],
count: int,
offset: int
) -> numpy.ndarray

Read bytes into a numpy array.

Parameters:

dtype
Type[numpy.number]

Data-type of the returned array.

count
int

Number of items to read.

offset
int

Start reading from this offset (in bytes).

Returns: numpy.ndarray

numpy.ndarray: An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexReader(
idx_path: str,
multimodal: bool
)

Object class to read the index (.idx) file

Parameters:

idx_path
str

The path to the index file

multimodal
bool

Whether the dataset is multimodal

_buffer
= memoryview(self._mmap)
_mmap
= numpy.memmap(idx_path, mode='r', order='C')
document_count
= struct.unpack('<Q', f.read(8))[0]
document_indices
dtype
= DType.dtype_from_code(code)
dtype_size
= DType.size(self.dtype)
sequence_count
= struct.unpack('<Q', f.read(8))[0]
sequence_lengths
sequence_modes
Optional[ndarray] = None
sequence_pointers
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexReader.__del__() -> None

Clean up the object

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexReader.__getitem__(
idx: int
) -> typing.Tuple[numpy.int32, numpy.int64, typing.Optional[numpy.int8]]

Return the pointer, length, and mode at the index

Parameters:

idx
int

The index into the dataset

Returns: Tuple[numpy.int32, numpy.int64, Optional[numpy.int8]]

Tuple[numpy.int32, numpy.int64, Optional[numpy.int8]]: The pointer, length and mode at the index

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexReader.__len__() -> int

Get the number of sequences in the dataset

Returns: int

The number of sequences in the dataset

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexWriter(
idx_path: str,
dtype: typing.Type[numpy.number]
)

Object class to write the index (.idx) file

Parameters:

idx_path
str

The path to the index file

dtype
Type[numpy.number]

The dtype of the index file

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexWriter.__enter__() -> '_IndexWriter'

Enter the context introduced by the ‘with’ keyword

Returns: '_IndexWriter'

The instance

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexWriter.__exit__(
exc_type: typing.Optional[typing.Type[BaseException]],
exc_val: typing.Optional[BaseException],
exc_tb: typing.Optional[types.TracebackType]
) -> typing.Optional[bool]

Exit the context introduced by the ‘with’ keyword

Parameters:

exc_type
Optional[Type[BaseException]]

Exception type

exc_val
Optional[BaseException]

Exception value

exc_tb
Optional[TracebackType]

Exception traceback object

Returns: Optional[bool]

Optional[bool]: Whether to silence the exception

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexWriter._sequence_pointers(
sequence_lengths: typing.List[int]
) -> typing.List[int]

Build the sequence pointers per the sequence lengths and dtype size

Parameters:

sequence_lengths
List[int]

The length of each sequence

Returns: List[int]

List[int]: The pointer to the beginning of each sequence

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._IndexWriter.write(
sequence_lengths: typing.List[int],
sequence_modes: typing.Optional[typing.List[int]],
document_indices: typing.List[int]
) -> None

Write the index (.idx) file

Parameters:

sequence_lengths
List[int]

The length of each sequence

sequence_modes
Optional[List[int]]

The mode of each sequences

document_indices
List[int]

The seqyebce indices demarcating the end of each document

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._MMapBinReader(
bin_path: str
)

Bases: _BinReader

A _BinReader that memory maps the data (.bin) file

_buffer
= memoryview(self._mmap.data)
_file
= open(bin_path, 'rb')
_mmap
= numpy.memmap(self._file, mode='r', order='C')
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._MMapBinReader.__del__() -> None

Clean up the object

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._MMapBinReader.read(
dtype: typing.Type[numpy.number],
count: int,
offset: int
) -> numpy.ndarray

Read bytes into a numpy array.

Parameters:

dtype
Type[numpy.number]

Data-type of the returned array.

count
int

Number of items to read.

offset
int

Start reading from this offset (in bytes).

Returns: numpy.ndarray

numpy.ndarray: An array with count items and data-type dtype constructed from reading bytes from the data file starting at offset.

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._MultiStorageClientBinReader(
bin_path: str,
object_storage_config: nemo_automodel.components.datasets.llm.megatron.indexed_dataset.ObjectStorageConfig
)

Bases: _BinReader

Read .bin data via NVIDIA’s :mod:multi_storage_client.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._MultiStorageClientBinReader.read(
dtype: typing.Type[numpy.number],
count: int,
offset: int
) -> numpy.ndarray

Read count elements of dtype starting at byte offset.

class nemo_automodel.components.datasets.llm.megatron.indexed_dataset._S3BinReader(
bin_path: str,
object_storage_config: nemo_automodel.components.datasets.llm.megatron.indexed_dataset.ObjectStorageConfig
)

Bases: _BinReader

Stream .bin data from S3 via chunked ranged GetObject calls.

A single in-memory chunk (sized by :attr:ObjectStorageConfig.bin_chunk_nbytes) is cached so consecutive reads within the same chunk avoid network round-trips. Random-access reads outside the current chunk trigger a new ranged GetObject.

_cache
Optional[bytes] = None
_cache_bytes_end
int = 0
_cache_bytes_start
int = 0
_cache_nbytes
= object_storage_config.bin_chunk_nbytes
_client
= boto3.client('s3')
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._S3BinReader.__del__() -> None
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._S3BinReader._extract_from_cache(
offset: int,
size: int
) -> bytes
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._S3BinReader.read(
dtype: typing.Type[numpy.number],
count: int,
offset: int
) -> numpy.ndarray

Read count elements of dtype starting at byte offset.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._cache_index_file(
remote_path: str,
local_path: str
) -> None

Download .idx from object storage to local_path.

Rank 0 performs the download and other ranks wait on a torch.distributed barrier. If the local file already exists this is a no-op.

Raises:

  • ImportError: If the relevant client library (boto3 for s3:// or multi_storage_client for msc://) is not installed.
  • ValueError: If remote_path is neither an s3:// nor an msc:// URI.
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._get_index_cache_path(
idx_path: str,
object_storage_config: nemo_automodel.components.datasets.llm.megatron.indexed_dataset.ObjectStorageConfig
) -> str

Return the local cache path for idx_path under path_to_idx_cache.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._is_object_storage_path(
path: str
) -> bool

Return True if path is an s3:// or msc:// URI.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset._normalize_prefix(
path_prefix: str
) -> str
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._parse_s3_path(
path: str
) -> typing.Tuple[str, str]

Split an s3://bucket/key URI into (bucket, key).

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.get_bin_path(
path_prefix: str
) -> str

Return the binary-data path for a Megatron dataset prefix.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.get_idx_path(
path_prefix: str
) -> str

Return the index-file path for a Megatron dataset prefix.

nemo_automodel.components.datasets.llm.megatron.indexed_dataset.OBJECT_STORAGE_BIN_READERS: Dict[str, Type[_BinReader]] = {'s3': _S3BinReader, 'msc': _MultiStorageClientBinReader}
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._INDEX_HEADER = b'MMIDIDX\x00\x00'
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._MSC_PREFIX = 'msc://'
nemo_automodel.components.datasets.llm.megatron.indexed_dataset._S3_PREFIX = 's3://'
nemo_automodel.components.datasets.llm.megatron.indexed_dataset.logger = logging.getLogger(__name__)