nemo_automodel.components.datasets.llm.delta_lake_dataset

View as Markdown

Delta Lake dataset support for streaming instruction-tuning datasets.

This module provides support for reading Delta Lake tables from Databricks or local storage as streaming datasets. It integrates with the existing ColumnMappedTextInstructionDataset infrastructure.

Supports tables with Deletion Vectors (Databricks Runtime 15.4+) via Spark (Databricks runtime) and optionally via Databricks SQL Connector for Unity Catalog access outside of Spark.

Module Contents

Classes

NameDescription
DeltaLakeDatasetHuggingFace datasets-compatible wrapper for Delta Lake tables.
DeltaLakeIteratorIterator that yields rows from a Delta Lake table.
_LimitedDeltaLakeDatasetInternal wrapper to limit a Delta Lake dataset to n samples.

Functions

NameDescription
_build_uc_table_fqnBuild a fully-qualified UC table name with safe quoting.
_check_databricks_sql_availableCheck if databricks-sql-connector is available for Unity Catalog access.
_check_delta_reader_availableCheck if any Delta Lake reader is available (deltalake, Spark, or databricks-sql).
_check_deltalake_availableCheck if the deltalake package is available.
_check_pyspark_availableCheck if PySpark is available (used as a fallback on Databricks for deletion vectors).
_get_spark_sessionGet an active Spark session if available (Databricks notebooks/jobs).
_is_deletion_vectors_errorReturn True if e looks like the deltalake ‘deletionVectors’ unsupported-reader-features error.
_is_location_overlap_errorReturn True if e looks like Databricks UC managed-storage overlap.
_is_unity_catalog_pathCheck if path refers to a Unity Catalog table (catalog.schema.table format).
_normalize_delta_pathNormalize a Delta Lake path by removing the delta:// prefix if present.
_parse_unity_storage_idsParse Unity Catalog managed storage IDs from a __unitystorage path.
_quote_sql_identQuote an identifier for Spark SQL (handles embedded backticks).
_resolve_uc_table_from_unity_storage_pathIf path looks like UC managed storage, try to resolve to catalog.schema.table.
_try_resolve_uc_table_from_system_tablesBest-effort reverse lookup of a UC table name via Databricks system tables.
is_delta_lake_pathCheck if a path refers to a Delta Lake table.

Data

_DATABRICKS_SQL_AVAILABLE

_DELTALAKE_AVAILABLE

_PYSPARK_AVAILABLE

_UNITY_STORAGE_TABLE_PATH_RE

logger

API

class nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset(
table_path: str,
columns: typing.Optional[list] = None,
storage_options: typing.Optional[typing.Dict[str, str]] = None,
version: typing.Optional[int] = None,
sql_query: typing.Optional[str] = None
)

HuggingFace datasets-compatible wrapper for Delta Lake tables.

This class provides better integration with the HuggingFace datasets library, supporting features like sharding, shuffling, and epoch setting for distributed training scenarios.

Parameters:

table_path
str

Path to the Delta Lake table.

columns
Optional[list]Defaults to None

Optional list of column names to read.

storage_options
Optional[Dict[str, str]]Defaults to None

Optional dict of storage options for cloud authentication.

version
Optional[int]Defaults to None

Optional specific version of the Delta table to read.

_data_iterator
_epoch
int = 0
_shard_info
Optional[tuple] = None
_shuffle_info
Optional[tuple] = None
storage_options
= storage_options or {}
table_path
= _normalize_delta_path(table_path)
nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset.__getitem__(
idx: int
) -> typing.Dict[str, typing.Any]

Get a specific row by index.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset.__iter__() -> typing.Iterator[typing.Dict[str, typing.Any]]

Iterate over rows in the dataset.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset.__len__() -> int

Return the number of rows in the table.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset.set_epoch(
epoch: int
) -> None

Set the current epoch for deterministic shuffling.

Parameters:

epoch
int

The epoch number.

Shard the dataset for distributed processing.

Parameters:

num_shards
int

Total number of shards.

index
int

Index of this shard (0-based).

Returns: DeltaLakeDataset

Self for method chaining.

Configure shuffling for the dataset.

Note: For streaming Delta Lake datasets, shuffling is performed on-the-fly using a shuffle buffer. The actual shuffling happens during iteration.

Parameters:

buffer_size
intDefaults to 1000

Size of the shuffle buffer.

seed
Optional[int]Defaults to None

Random seed for reproducibility.

Returns: DeltaLakeDataset

Self for method chaining.

Limit the dataset to the first n samples.

Parameters:

n
int

Number of samples to take.

Returns: DeltaLakeDataset

A new DeltaLakeDataset limited to n samples.

class nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator(
table_path: str,
columns: typing.Optional[list] = None,
storage_options: typing.Optional[typing.Dict[str, str]] = None,
batch_size: int = 1024,
version: typing.Optional[int] = None,
sql_query: typing.Optional[str] = None,
shard_info: typing.Optional[tuple[int, int]] = None
)

Iterator that yields rows from a Delta Lake table.

This class provides a streaming interface for Delta Lake tables, yielding rows as dictionaries one at a time to support memory-efficient iteration over large tables.

Supports tables with deletion vectors (Databricks Runtime 15.4+) via Spark backend (recommended when running in Databricks notebooks/jobs).

Parameters:

table_path
str

Path to the Delta Lake table.

columns
Optional[list]Defaults to None

Optional list of column names to read. If None, reads all columns.

storage_options
Optional[Dict[str, str]]Defaults to None

Optional storage options for cloud storage access.

batch_size
intDefaults to 1024

Number of rows to read at a time (default: 1024).

version
Optional[int]Defaults to None

Optional version of the table to read.

sql_query
Optional[str]Defaults to None

Optional SQL query to read the table and/or create alias columns.

shard_info
Optional[tuple[int, int]]Defaults to None

Optional sharding configuration (num_shards, shard_index). When provided, only rows where row_idx % num_shards == shard_index are yielded.

_shard_info
Optional[tuple[int, int]] = None
storage_options
= storage_options or {}
table_path
= _normalize_delta_path(table_path)
nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator.__iter__() -> typing.Iterator[typing.Dict[str, typing.Any]]

Iterate over rows in the Delta Lake table.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator._add_env_storage_options()

Add storage options from environment variables if not already set.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator._iter_all_rows() -> typing.Iterator[typing.Dict[str, typing.Any]]

Iterate over all rows (no sharding).

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator._iter_with_databricks_sql() -> typing.Iterator[typing.Dict[str, typing.Any]]

Iterate using Databricks SQL Connector (for Unity Catalog tables).

This is the recommended method for accessing Unity Catalog tables as it handles authentication, deletion vectors, and column mapping natively.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator._iter_with_deltalake() -> typing.Iterator[typing.Dict[str, typing.Any]]

Iterate using deltalake library.

nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator._iter_with_spark() -> typing.Iterator[typing.Dict[str, typing.Any]]

Iterate using Spark (supports deletion vectors on Databricks).

This backend requires a working SparkSession (e.g., Databricks notebooks/jobs). It is the recommended fallback for Delta tables that use deletion vectors.

Shard the iterator for distributed processing.

Parameters:

num_shards
int

Total number of shards.

index
int

Index of this shard (0-based).

class nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset(
base: nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset,
limit: int
)

Internal wrapper to limit a Delta Lake dataset to n samples.

nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset.__iter__() -> typing.Iterator[typing.Dict[str, typing.Any]]
nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset.set_epoch(
epoch: int
) -> None
nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset.shard(
num_shards: int,
index: int
)
nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset.shuffle(
buffer_size: int = 1000,
seed: typing.Optional[int] = None
)
nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset.take(
n: int
)
nemo_automodel.components.datasets.llm.delta_lake_dataset._build_uc_table_fqn(
catalog: str,
schema: str,
table: str
) -> str

Build a fully-qualified UC table name with safe quoting.

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_databricks_sql_available() -> bool

Check if databricks-sql-connector is available for Unity Catalog access.

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_delta_reader_available() -> bool

Check if any Delta Lake reader is available (deltalake, Spark, or databricks-sql).

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_deltalake_available() -> bool

Check if the deltalake package is available.

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_pyspark_available() -> bool

Check if PySpark is available (used as a fallback on Databricks for deletion vectors).

nemo_automodel.components.datasets.llm.delta_lake_dataset._get_spark_session() -> typing.Optional[typing.Any]

Get an active Spark session if available (Databricks notebooks/jobs).

Returns: Optional[Any]

A SparkSession instance if available, else None.

nemo_automodel.components.datasets.llm.delta_lake_dataset._is_deletion_vectors_error(
e: BaseException
) -> bool

Return True if e looks like the deltalake ‘deletionVectors’ unsupported-reader-features error.

nemo_automodel.components.datasets.llm.delta_lake_dataset._is_location_overlap_error(
e: BaseException
) -> bool

Return True if e looks like Databricks UC managed-storage overlap.

nemo_automodel.components.datasets.llm.delta_lake_dataset._is_unity_catalog_path(
path: str
) -> bool

Check if path refers to a Unity Catalog table (catalog.schema.table format).

nemo_automodel.components.datasets.llm.delta_lake_dataset._normalize_delta_path(
path: str
) -> str

Normalize a Delta Lake path by removing the delta:// prefix if present.

Parameters:

path
str

The Delta Lake path.

Returns: str

The normalized path suitable for the deltalake library.

nemo_automodel.components.datasets.llm.delta_lake_dataset._parse_unity_storage_ids(
path: str
) -> typing.Optional[typing.Dict[str, str]]

Parse Unity Catalog managed storage IDs from a __unitystorage path.

Direct path access to these locations is blocked on Databricks (“LOCATION_OVERLAP”).

nemo_automodel.components.datasets.llm.delta_lake_dataset._quote_sql_ident(
ident: str
) -> str

Quote an identifier for Spark SQL (handles embedded backticks).

nemo_automodel.components.datasets.llm.delta_lake_dataset._resolve_uc_table_from_unity_storage_path(
spark: typing.Any,
path: str
) -> typing.Optional[str]

If path looks like UC managed storage, try to resolve to catalog.schema.table.

nemo_automodel.components.datasets.llm.delta_lake_dataset._try_resolve_uc_table_from_system_tables(
spark: typing.Any,
table_id: typing.Optional[str] = None,
storage_location: typing.Optional[str] = None
) -> typing.Optional[str]

Best-effort reverse lookup of a UC table name via Databricks system tables.

nemo_automodel.components.datasets.llm.delta_lake_dataset.is_delta_lake_path(
path: str
) -> bool

Check if a path refers to a Delta Lake table.

A path is considered a Delta Lake path if:

  1. It starts with “delta://” protocol prefix
  2. It’s a local directory containing a “_delta_log” subdirectory
  3. It starts with “dbfs:/” (Databricks file system)

Parameters:

path
str

The path to check.

Returns: bool

True if the path is a Delta Lake table, False otherwise.

nemo_automodel.components.datasets.llm.delta_lake_dataset._DATABRICKS_SQL_AVAILABLE: Optional[bool] = None
nemo_automodel.components.datasets.llm.delta_lake_dataset._DELTALAKE_AVAILABLE: Optional[bool] = None
nemo_automodel.components.datasets.llm.delta_lake_dataset._PYSPARK_AVAILABLE: Optional[bool] = None
nemo_automodel.components.datasets.llm.delta_lake_dataset._UNITY_STORAGE_TABLE_PATH_RE = re.compile('__unitystorage/catalogs/(?P<catalog_id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}...
nemo_automodel.components.datasets.llm.delta_lake_dataset.logger = logging.getLogger(__name__)