bridge.data.datasets.packed_parquet#

Packed Parquet dataset support for SFT training.

This module provides GPTSFTPackedParquetDataset, which reads packed sequence data from Parquet files as an alternative to the NumPy-based GPTSFTPackedDataset.

Supports multiple files via:

  • Single file: “data.idx.parquet”, “shard_0.parquet”

  • Glob pattern: “data*.idx.parquet”, “shard_*.parquet”

  • Directory: “/path/to/data/” (globs for *.parquet and *.pq)

Key functions:

  • is_packed_parquet_spec(): Check if a spec refers to packed Parquet data

  • resolve_packed_parquet_paths(): Resolve a spec to actual file paths

Module Contents#

Classes#

GPTSFTPackedParquetDataset

Dataset for packed sequences stored in Parquet format.

Functions#

is_packed_parquet_file

Check if a path refers to a packed Parquet file or pattern.

is_packed_parquet_spec

Check if a spec refers to a packed Parquet source (file, directory, or glob).

_lazy_import_pyarrow

Lazily import pyarrow and raise a clear error if not installed.

_is_parquet_file

Check if a path refers to any Parquet file.

_resolve_parquet_paths

Resolve a file path specification to a list of actual file paths.

resolve_packed_parquet_paths

Resolve a packed parquet spec to a list of shard file paths.

write_packed_parquet

Write packed sequence data to a Parquet file.

Data#

API#

bridge.data.datasets.packed_parquet.logger#

‘getLogger(…)’

bridge.data.datasets.packed_parquet.REQUIRED_COLUMNS#

None

bridge.data.datasets.packed_parquet.is_packed_parquet_file(path) bool#

Check if a path refers to a packed Parquet file or pattern.

Parameters:

path – A Path object or string path.

Returns:

True if the path ends with .idx.parquet or .idx.pq, or contains a glob pattern that would match such files.

bridge.data.datasets.packed_parquet.is_packed_parquet_spec(spec: str | pathlib.Path) bool#

Check if a spec refers to a packed Parquet source (file, directory, or glob).

This predicate reflects what the dataset loader supports in packed mode:

  • Single .parquet/.idx.parquet/.idx.pq files

  • Glob patterns ending in .parquet/.idx.parquet/.idx.pq

  • Directories containing parquet files

Parameters:

spec – A path specification (file, directory, or glob pattern).

Returns:

True if the spec could refer to packed Parquet data.

bridge.data.datasets.packed_parquet._lazy_import_pyarrow()#

Lazily import pyarrow and raise a clear error if not installed.

bridge.data.datasets.packed_parquet._is_parquet_file(path: str) bool#

Check if a path refers to any Parquet file.

Parameters:

path – A string path.

Returns:

True if the path ends with .parquet or .pq (case-insensitive).

bridge.data.datasets.packed_parquet._resolve_parquet_paths(file_path: str) list[str]#

Resolve a file path specification to a list of actual file paths.

Supports:

  • Single file: “data.idx.parquet”, “shard_0.parquet”

  • Glob pattern: “data*.idx.parquet”, “shard_*.parquet”

  • Directory: “/path/to/data/” (globs for *.parquet and *.pq)

Parameters:

file_path – Path specification (file, glob pattern, or directory).

Returns:

Sorted list of resolved file paths.

Raises:

ValueError – If no matching files are found.

bridge.data.datasets.packed_parquet.resolve_packed_parquet_paths(spec: str | pathlib.Path) list[str]#

Resolve a packed parquet spec to a list of shard file paths.

Public wrapper around the internal _resolve_parquet_paths function. Use this to validate and resolve packed parquet specs before dataset creation.

Supports:

  • Single file: “data.idx.parquet”, “shard_0.parquet”

  • Glob pattern: “data*.idx.parquet”, “shard_*.parquet”

  • Directory: “/path/to/data/” (globs for *.parquet and *.pq)

Parameters:

spec – Path specification (file, glob pattern, or directory).

Returns:

Sorted list of resolved file paths.

Raises:

ValueError – If no matching files are found.

bridge.data.datasets.packed_parquet.write_packed_parquet(
rows: list[dict],
output_path: str | pathlib.Path,
row_group_size: int = 500,
) None#

Write packed sequence data to a Parquet file.

Parameters:
  • rows – List of dicts with keys ‘input_ids’, ‘loss_mask’, ‘seq_start_id’. This is the output format of fill_packing_strategy().

  • output_path – Path to write the Parquet file.

  • row_group_size – Number of rows per row group (default 500).

class bridge.data.datasets.packed_parquet.GPTSFTPackedParquetDataset(
file_path: str,
tokenizer: megatron.bridge.training.tokenizers.tokenizer.MegatronTokenizer,
return_cu_seqlen: bool = True,
pad_cu_seqlens: bool = False,
pack_metadata_file_path: str | None = None,
**kwargs,
)#

Bases: megatron.bridge.data.datasets.sft.GPTSFTPackedDataset

Dataset for packed sequences stored in Parquet format.

This class reads packed training data from Parquet files with the naming convention *.idx.parquet or *.idx.pq. It inherits from GPTSFTPackedDataset to reuse the collate_fn() and loss-mask semantics.

Supports multiple files via:

  • Single file: “data.idx.parquet”

  • Glob pattern: “data*.idx.parquet” or “shard_*.idx.pq”

  • Directory: “/path/to/data/” (globs for *.idx.parquet and *.idx.pq)

The Parquet file(s) must contain the following columns: - input_ids: list - Token IDs for the packed sequence - seq_start_id: list - Start offsets for each sub-sequence within the pack - loss_mask: list - Per-token loss mask (0 or 1), same length as input_ids

.. rubric:: Example

Single file

dataset = GPTSFTPackedParquetDataset( … file_path=”packed_data.idx.parquet”, … tokenizer=tokenizer, … )

Multiple files via glob

dataset = GPTSFTPackedParquetDataset( … file_path=”data/shard_*.idx.parquet”, … tokenizer=tokenizer, … )

Initialization

Initialize the packed Parquet dataset.

Parameters:
  • file_path

    Path to packed Parquet file(s). Supports:

    • Single file: “data.idx.parquet”

    • Glob pattern: “data*.idx.parquet”

    • Directory: “/path/to/data/”

  • tokenizer – The tokenizer to use.

  • return_cu_seqlen – Whether to return cu_seqlen for THD attention kernel.

  • pad_cu_seqlens – Whether to pad cu_seqlens for cudagraphs compatibility.

  • pack_metadata_file_path – Path to the metadata JSON file for pad_cu_seqlens.

  • **kwargs – Additional arguments passed to parent class.

_load_dataset()#

Load Parquet metadata from all files and validate schemas.

This method:

  1. Resolves the file path specification to actual files

  2. Reads metadata from each file (not actual data)

  3. Validates schemas contain required columns

  4. Builds cumulative indices for efficient row lookups

The actual Parquet files are opened lazily in _ensure_reader() to survive DataLoader worker forking.

static validate_row(
idx: int,
input_ids: list,
loss_mask: list,
seq_start_id: list,
) None#

Validate packed row invariants.

This is NOT called in the training hot path for performance reasons. Use it during data preparation or for debugging.

Parameters:
  • idx – Row index (for error messages).

  • input_ids – Token IDs for the packed sequence.

  • loss_mask – Per-token loss mask.

  • seq_start_id – Start offsets for each sub-sequence.

Raises:

ValueError – If any invariant is violated.

_ensure_reader(file_idx: int)#

Lazily open a Parquet file for reading.

Parameters:

file_idx – Index of the file in self._parquet_paths.

This method is called before accessing data and creates the ParquetFile reader if it doesn’t exist. This lazy initialization ensures the reader survives DataLoader worker forking (each worker creates its own readers).

close() None#

Close all open Parquet file handles.

This method should be called when the dataset is no longer needed to release file handles, especially when using MSC backends. It is also called automatically by del.

__del__() None#

Cleanup on deletion.

_build_samples_mapping()#

Build epoch-level sample mapping for shuffling.

Mirrors GPTSFTPackedDataset._build_samples_mapping() exactly, using self._num_rows instead of len(self.indexed_dataset).

__len__()#

Return the number of samples in the dataset.

_locate_row(global_idx: int) tuple[int, int, int]#

Map a global row index to (file_idx, row_group_id, row_in_group).

Parameters:

global_idx – Global row index across all files.

Returns:

Tuple of (file_idx, row_group_id, row_in_group).

__getitem__(idx: int) dict#

Get a packed sample by index.

Parameters:

idx – Sample index. If samples_mapping exists, this is mapped to the actual row index. Negative indices return samples with zeroed loss_mask.

Returns:

  • input_ids: list[int] - Token IDs

  • seq_boundaries: list[int] - Sequence boundaries (derived from seq_start_id)

  • loss_mask: list[int] - Per-token loss mask

Return type:

dict with keys