bridge.data.datasets.utils#

Module Contents#

Classes#

_TextMemMapDataset

Allow per-line lazy access to multiple text files using numpy memmap.

_JSONLMemMapDataset

Memory-mapped iteration over a JSONL file.

_OnlineSampleMapping

This class replaces NeMo’s get_samples_mapping function which pre-computes. It is used to create a sample mapping for certain number of samples, including pseudo-random shuffling. The sampler allows to down, or upsample a given dataset. Shuffling leads to pseudo-random shuffling, where blocks are shuffled, and each block is internally shuffled.

Functions#

build_index_from_memdata

Build index of delimiter positions between samples in memmap. Can be provided externally.

build_index_files

Auxiliary method to build multiple index files

handle_index

Remaps negative indices and handles numpy int indices.

rank_0_prepare_data

This function checks whether it is invoked in the builder method “prepare_data”, which is run only on rank 0. TextMemMapDataset contains a torch.distributed.barrier operation, so when run inside the single-process method prepare_data, the barrier operation would hang forever.

_get_samples_mapping

Get a list that maps a sample index to a starting sentence index, end sentence index, and length

_make_indexed_dataset_compatibility

Make any dataset compatible with IndexedDataset for Megatron samples mapping.

_preprocess

Given a conversation list. This transform:
1. Add signal '### ' at the beginning each sentence, with end signal '

_mask_targets

This function masks the tokens so the loss is computed only on the non-masked role’s responses. For ‘TEXT_TO_VALUE’ type, the loss is computed on the value attributes.

_get_header_conversation_type_mask_role

_add_speaker_and_signal

_response_value_formater

_identify_start_index_of_subsequence

find the location of the small tensor in the large tensor. e.g. small = [1,3], large = [2,3,1,3], returns 2 small = [3,2], large = [2,3,1,3], returns -1

_build_memmap_index_files

Helper function to build an index file

_index_fn

Return base file name of index files.

_index_file_exists

Helper function to test if index file exists

_deallocate_indexed_dataset_memory

Deallocate memory of an IndexedDataset.

Data#

API#

bridge.data.datasets.utils.logger#

‘getLogger(…)’

bridge.data.datasets.utils.PREFIX_STR#

‘\x00’

bridge.data.datasets.utils.IGNORE_INDEX#

None

bridge.data.datasets.utils.SYSTEM_TOKEN#

‘System’

bridge.data.datasets.utils.TYPE_INSTRUCTION#

None

bridge.data.datasets.utils.__idx_version__#

‘0.2’

bridge.data.datasets.utils.__idx_suffix__#

‘idx’

bridge.data.datasets.utils.build_index_from_memdata(fn, newline_int)#

Build index of delimiter positions between samples in memmap. Can be provided externally.

Returns a 1D array of ints.

class bridge.data.datasets.utils._TextMemMapDataset(
dataset_paths: List[str],
newline_int: Optional[int] = 10,
header_lines: Optional[int] = 0,
workers: Optional[int] = None,
tokenizer: Optional[Type[megatron.bridge.training.tokenizers.tokenizer.MegatronTokenizer]] = None,
build_index_fn: Optional[Callable[[str, Optional[int]], bool]] = build_index_from_memdata,
sort_dataset_paths: Optional[bool] = True,
index_mapping_dir: Optional[str] = None,
)#

Bases: torch.utils.data.Dataset

Allow per-line lazy access to multiple text files using numpy memmap.

Initialization

Parameters:
  • dataset_paths – list of JSONL file paths.

  • newline_int – ASCII code to use to interpret newlines in file.

  • header_lines – number of header lines in JSON files.

  • workers – number of workers to use for creating index files.

  • tokenizer – tokenizer to use to convert text to tokens.

  • build_index_fn – a callable build_index_fn(fn, newline_int) -> midx [np.array] that returns the index of newlines in a file fn must be pickleable (to be used in multiprocessing.Pool.map).

  • sort_dataset_paths – whether to sort datasets by paths.

  • index_mapping_dir – directory to save the index mapping to. If None, will write to the same folder as the dataset.

__del__()#
__len__()#
__getitem__(idx)#

Return a string from binary memmap

_fetch_sample_from_memmap(mdata, i, j)#

Fetchs the text sample. Can be overriden by child-classes to support loading of partial samples and alternative decode methods.

_build_data_from_text(text)#

Allows child-classes to modify the parsing of raw text, prior to tokenization

load_file(fn, index_mapping_dir: Optional[str] = None)#

Loads a text file as np.int8.

Returns:

mdata - memorymap of np.int8 midx - indices pointing to the end-of-line (or end of file) position size - number of lines in file

class bridge.data.datasets.utils._JSONLMemMapDataset(
dataset_paths: List[str],
newline_int: Optional[int] = 10,
header_lines: Optional[int] = 0,
workers: Optional[int] = None,
tokenizer: Optional[Type[megatron.bridge.training.tokenizers.tokenizer.MegatronTokenizer]] = None,
sort_dataset_paths: Optional[bool] = True,
index_mapping_dir: Optional[str] = None,
)#

Bases: bridge.data.datasets.utils._TextMemMapDataset

Memory-mapped iteration over a JSONL file.

Initialization

Parameters:
  • dataset_paths – list of JSONL file paths.

  • newline_int – ASCII code to use to interpret newlines in file.

  • header_lines – number of header lines in JSON files.

  • workers – number of workers to use for creating index files.

  • tokenizer – tokenizer to use to convert text to tokens.

  • sort_dataset_paths – whether to sort datasets by paths.

  • index_mapping_dir – directory to save the index mapping to. If None, will write to the same folder as the dataset.

_build_data_from_text(text)#

Return a dictionary of data based on a single JSON line.

class bridge.data.datasets.utils._OnlineSampleMapping(
dataset_size: int,
num_samples: int,
block_size: int = 1000000,
cache_maxsize: int = 2,
seed: int = 1,
shuffle: bool = True,
truncate_to_block_boundary: bool = False,
)#

This class replaces NeMo’s get_samples_mapping function which pre-computes. It is used to create a sample mapping for certain number of samples, including pseudo-random shuffling. The sampler allows to down, or upsample a given dataset. Shuffling leads to pseudo-random shuffling, where blocks are shuffled, and each block is internally shuffled.

Initialization

Parameters:
  • dataset_size (int) – Size of the dataset.

  • num_samples (int) – Number of samples the dataset should contain.

  • block_size (int) – Size of each sample block. This is used to shuffle the samples. None will be replaced with dataset size.

  • cache_maxsize (int) – Maximum size of the blocks cache for the get_sample_block function.

  • seed (int) – Seed for the random number generator used for shuffling.

  • shuffle (bool) – Whether to shuffle the samples.

  • truncate_to_block_boundary (bool) – Whether to truncate the last block to the block boundary.

__str__()#
__getitem__(idx: int) int#
__len__() int#
__reduce__()#

Add support for pickling. Needed due to functools.lru_cache.

__reduce_ex__(protocol)#
get_sample_block(block_idx: int) numpy.ndarray#

Returns a block of samples of size self.block_size, shuffled if needed. NOTE: This method will be cached using functools.lru_cache for efficiency during construction.

bridge.data.datasets.utils.build_index_files(
dataset_paths,
newline_int,
workers=None,
build_index_fn=build_index_from_memdata,
index_mapping_dir: str = None,
)#

Auxiliary method to build multiple index files

bridge.data.datasets.utils.handle_index(dataset, idx)#

Remaps negative indices and handles numpy int indices.

Parameters:
  • dataset (Dataset) – dataset to index into

  • idx (int) – Index. Can include negative indices.

Returns:

Remapped and fully qualified index.

Return type:

int

Raises:

IndexError – If a negative index is out of range.

.. rubric:: Examples

import numpy as np import torch from torch.utils.data import TensorDataset from nemo_chem.data.fasta_dataset import handle_index dataset = TensorDataset(torch.tensor(-np.arange(5))) handle_index(dataset, 1) 1 handle_index(dataset, -2) 3

bridge.data.datasets.utils.rank_0_prepare_data() bool#

This function checks whether it is invoked in the builder method “prepare_data”, which is run only on rank 0. TextMemMapDataset contains a torch.distributed.barrier operation, so when run inside the single-process method prepare_data, the barrier operation would hang forever.

bridge.data.datasets.utils._get_samples_mapping(
indexed_dataset,
data_prefix,
num_epochs,
max_num_samples,
max_seq_length,
short_seq_prob,
seed,
name,
binary_head,
index_mapping_dir: str = None,
samples_mapping: Any = None,
sanity_check_dist_workers: bool = True,
)#

Get a list that maps a sample index to a starting sentence index, end sentence index, and length

bridge.data.datasets.utils._make_indexed_dataset_compatibility(dataset)#

Make any dataset compatible with IndexedDataset for Megatron samples mapping.

bridge.data.datasets.utils._preprocess(
source: dict,
tokenizer: megatron.bridge.training.tokenizers.tokenizer.MegatronTokenizer,
name_end_token_ids: int,
label_start_ids: list,
special_tokens: dict,
num_turn_start_tokens: int,
)#
Given a conversation list. This transform:
1. Add signal '### ' at the beginning each sentence, with end signal '

‘; 2. Concatenate conversations together; 3. Tokenize the concatenated conversation; 4. Make a deepcopy as the target. Mask human words with IGNORE_INDEX.

bridge.data.datasets.utils._mask_targets(
target,
tokenized_lens,
speakers,
header_len,
s_ids,
tokenizer,
mask_role,
gtype,
name_end_token_ids,
special_tokens,
label_start_ids,
num_turn_start_tokens,
)#

This function masks the tokens so the loss is computed only on the non-masked role’s responses. For ‘TEXT_TO_VALUE’ type, the loss is computed on the value attributes.

Parameters:
  • target (Tensor) – input ids

  • tokenized_lens (List[int]) – array of lengths of each turns

  • speakers (List[str]) – array of speakers of each turns

  • header_len (int) – the system prompt length

  • s_ids (List[Tensor]) – array of tokenized ids of each turns

  • tokenizer (MegatronTokenizer) – tokenizer object

  • mask_role (str) – the speaker id to be masked from loss computation.

  • gtype (str) – either ‘TEXT_TO_VALUE’ or ‘VALUE_TO_TEXT’

  • name_end_token_ids (int) – end of name token ids

  • special_tokens (dict) – special tokens used for the chat prompt.

  • label_start_ids (list) – list of label start token ids,

  • num_turn_start_tokens (int) – number of tokens of the turn_start str

bridge.data.datasets.utils._get_header_conversation_type_mask_role(source, special_tokens)#
bridge.data.datasets.utils._add_speaker_and_signal(
header,
source,
mask_role,
gtype,
special_tokens,
)#
bridge.data.datasets.utils._response_value_formater(label, label_start, end_signal)#
bridge.data.datasets.utils._identify_start_index_of_subsequence(subsequence, sequence)#

find the location of the small tensor in the large tensor. e.g. small = [1,3], large = [2,3,1,3], returns 2 small = [3,2], large = [2,3,1,3], returns -1

Parameters:
  • small (tensor) – small tensor

  • large (tensor) – large tensor

bridge.data.datasets.utils._build_memmap_index_files(
newline_int,
build_index_fn,
fn,
index_mapping_dir: str,
)#

Helper function to build an index file

bridge.data.datasets.utils._index_fn(fn: str, index_mapping_dir: str) str#

Return base file name of index files.

This returns the base file name associated with specified index files. This base name is the base on top of which suffixes like .npy or .info are added.

The parent directory is created if it does not already exist.

fn may be specified in multiple ways:

  1. file name: data.jsonl,

  2. relative path to a file: relative/path/to/data.jsonl,

  3. absolute path to a file: /absolute/path/to/data.jsonl.

This function returns paths in the pattern of:

  1. /path/to/input_mapping_dir/data.jsonl.idx

  2. /path/to/input_mapping_dir/relative/path/to/data.jsonl.idx

  3. /path/to/input_mapping_dir/absolute/path/to/data.jsonl.idx

Parameters:
  • fn – filename to get base name for.

  • index_mapping_dir – directory to save the index mapping to. If None, will write to the same folder as the dataset.

bridge.data.datasets.utils._index_file_exists(idx_fn)#

Helper function to test if index file exists

bridge.data.datasets.utils._deallocate_indexed_dataset_memory(indexed_dataset)#

Deallocate memory of an IndexedDataset.