nemo_automodel.components.datasets.llm.delta_lake_dataset#

Delta Lake dataset support for streaming instruction-tuning datasets.

This module provides support for reading Delta Lake tables from Databricks or local storage as streaming datasets. It integrates with the existing ColumnMappedTextInstructionDataset infrastructure.

Supports tables with Deletion Vectors (Databricks Runtime 15.4+) via Spark (Databricks runtime) and optionally via Databricks SQL Connector for Unity Catalog access outside of Spark.

Installation: ```bash # For basic Delta Lake support (without deletion vectors) pip install deltalake

# For Databricks Unity Catalog access without Spark (optional)
pip install databricks-sql-connector deltalake
```

Usage: Local Delta tables:

```python
from nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset import (
    ColumnMappedTextInstructionIterableDataset,
)

ds = ColumnMappedTextInstructionIterableDataset(
    path_or_dataset_id="delta:///path/to/delta_table",
    column_mapping={"question": "input", "answer": "output"},
    tokenizer=tokenizer,
)
```

Cloud storage (S3, Azure, GCS):

```python
ds = ColumnMappedTextInstructionIterableDataset(
    path_or_dataset_id="s3://bucket/path/to/delta_table",
    column_mapping={"question": "input", "answer": "output"},
    tokenizer=tokenizer,
    delta_storage_options={
        "AWS_ACCESS_KEY_ID": "...",
        "AWS_SECRET_ACCESS_KEY": "...",
    },
)
```

Databricks Unity Catalog (streaming):

```python
ds = ColumnMappedTextInstructionIterableDataset(
    path_or_dataset_id="delta://catalog.schema.table",  # Unity Catalog format
    column_mapping={"question": "input", "answer": "output"},
    tokenizer=tokenizer,
    delta_storage_options={
        "DATABRICKS_HOST": "https://your-workspace.databricks.com",
        "DATABRICKS_TOKEN": "dapi...",
        "DATABRICKS_WAREHOUSE_ID": "abc123def456",  # or DATABRICKS_HTTP_PATH
    },
)
```

Environment Variables: The following environment variables are automatically detected:

- DATABRICKS_HOST / DATABRICKS_WORKSPACE_URL
- DATABRICKS_TOKEN / DATABRICKS_ACCESS_TOKEN
- DATABRICKS_HTTP_PATH / DATABRICKS_WAREHOUSE_ID / DATABRICKS_CLUSTER_ID
- AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_REGION
- AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCOUNT_KEY, AZURE_STORAGE_SAS_TOKEN
- GOOGLE_APPLICATION_CREDENTIALS

Module Contents#

Classes#

DeltaLakeIterator

Iterator that yields rows from a Delta Lake table.

DeltaLakeDataset

HuggingFace datasets-compatible wrapper for Delta Lake tables.

_LimitedDeltaLakeDataset

Internal wrapper to limit a Delta Lake dataset to n samples.

Functions#

_check_deltalake_available

Check if the deltalake package is available.

_check_pyspark_available

Check if PySpark is available (used as a fallback on Databricks for deletion vectors).

_check_databricks_sql_available

Check if databricks-sql-connector is available for Unity Catalog access.

_check_delta_reader_available

Check if any Delta Lake reader is available (deltalake, Spark, or databricks-sql).

_is_deletion_vectors_error

Return True if e looks like the deltalake ‘deletionVectors’ unsupported-reader-features error.

_get_spark_session

Get an active Spark session if available (Databricks notebooks/jobs).

_is_unity_catalog_path

Check if path refers to a Unity Catalog table (catalog.schema.table format).

is_delta_lake_path

Check if a path refers to a Delta Lake table.

_normalize_delta_path

Normalize a Delta Lake path by removing the delta:// prefix if present.

_parse_unity_storage_ids

Parse Unity Catalog managed storage IDs from a __unitystorage path.

_quote_sql_ident

Quote an identifier for Spark SQL (handles embedded backticks).

_build_uc_table_fqn

Build a fully-qualified UC table name with safe quoting.

_try_resolve_uc_table_from_system_tables

Best-effort reverse lookup of a UC table name via Databricks system tables.

_resolve_uc_table_from_unity_storage_path

If path looks like UC managed storage, try to resolve to catalog.schema.table.

_is_location_overlap_error

Return True if e looks like Databricks UC managed-storage overlap.

Data#

API#

nemo_automodel.components.datasets.llm.delta_lake_dataset.logger#

‘getLogger(…)’

nemo_automodel.components.datasets.llm.delta_lake_dataset._DELTALAKE_AVAILABLE: Optional[bool]#

None

nemo_automodel.components.datasets.llm.delta_lake_dataset._DATABRICKS_SQL_AVAILABLE: Optional[bool]#

None

nemo_automodel.components.datasets.llm.delta_lake_dataset._PYSPARK_AVAILABLE: Optional[bool]#

None

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_deltalake_available() bool#

Check if the deltalake package is available.

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_pyspark_available() bool#

Check if PySpark is available (used as a fallback on Databricks for deletion vectors).

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_databricks_sql_available() bool#

Check if databricks-sql-connector is available for Unity Catalog access.

nemo_automodel.components.datasets.llm.delta_lake_dataset._check_delta_reader_available() bool#

Check if any Delta Lake reader is available (deltalake, Spark, or databricks-sql).

nemo_automodel.components.datasets.llm.delta_lake_dataset._is_deletion_vectors_error(e: BaseException) bool#

Return True if e looks like the deltalake ‘deletionVectors’ unsupported-reader-features error.

nemo_automodel.components.datasets.llm.delta_lake_dataset._get_spark_session() Optional[Any]#

Get an active Spark session if available (Databricks notebooks/jobs).

Returns:

A SparkSession instance if available, else None.

nemo_automodel.components.datasets.llm.delta_lake_dataset._is_unity_catalog_path(path: str) bool#

Check if path refers to a Unity Catalog table (catalog.schema.table format).

nemo_automodel.components.datasets.llm.delta_lake_dataset.is_delta_lake_path(path: str) bool#

Check if a path refers to a Delta Lake table.

A path is considered a Delta Lake path if:

  1. It starts with “delta://” protocol prefix

  2. It’s a local directory containing a “_delta_log” subdirectory

  3. It starts with “dbfs:/” (Databricks file system)

Parameters:

path – The path to check.

Returns:

True if the path is a Delta Lake table, False otherwise.

nemo_automodel.components.datasets.llm.delta_lake_dataset._normalize_delta_path(path: str) str#

Normalize a Delta Lake path by removing the delta:// prefix if present.

Parameters:

path – The Delta Lake path.

Returns:

The normalized path suitable for the deltalake library.

nemo_automodel.components.datasets.llm.delta_lake_dataset._UNITY_STORAGE_TABLE_PATH_RE#

‘compile(…)’

nemo_automodel.components.datasets.llm.delta_lake_dataset._parse_unity_storage_ids(
path: str,
) Optional[Dict[str, str]]#

Parse Unity Catalog managed storage IDs from a __unitystorage path.

Databricks Unity Catalog managed tables use internal cloud locations like: …/__unitystorage/catalogs/<catalog_uuid>/tables/<table_uuid> Direct path access to these locations is blocked on Databricks (“LOCATION_OVERLAP”).

nemo_automodel.components.datasets.llm.delta_lake_dataset._quote_sql_ident(ident: str) str#

Quote an identifier for Spark SQL (handles embedded backticks).

nemo_automodel.components.datasets.llm.delta_lake_dataset._build_uc_table_fqn(catalog: str, schema: str, table: str) str#

Build a fully-qualified UC table name with safe quoting.

nemo_automodel.components.datasets.llm.delta_lake_dataset._try_resolve_uc_table_from_system_tables(
spark: Any,
*,
table_id: Optional[str] = None,
storage_location: Optional[str] = None,
) Optional[str]#

Best-effort reverse lookup of a UC table name via Databricks system tables.

nemo_automodel.components.datasets.llm.delta_lake_dataset._resolve_uc_table_from_unity_storage_path(
spark: Any,
path: str,
) Optional[str]#

If path looks like UC managed storage, try to resolve to catalog.schema.table.

nemo_automodel.components.datasets.llm.delta_lake_dataset._is_location_overlap_error(e: BaseException) bool#

Return True if e looks like Databricks UC managed-storage overlap.

class nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator(
table_path: str,
columns: Optional[list] = None,
storage_options: Optional[Dict[str, str]] = None,
batch_size: int = 1024,
version: Optional[int] = None,
sql_query: Optional[str] = None,
shard_info: Optional[tuple[int, int]] = None,
)#

Iterator that yields rows from a Delta Lake table.

This class provides a streaming interface for Delta Lake tables, yielding rows as dictionaries one at a time to support memory-efficient iteration over large tables.

Supports tables with deletion vectors (Databricks Runtime 15.4+) via Spark backend (recommended when running in Databricks notebooks/jobs).

Parameters:
  • table_path – Path to the Delta Lake table.

  • columns – Optional list of column names to read. If None, reads all columns.

  • storage_options – Optional storage options for cloud storage access.

  • batch_size – Number of rows to read at a time (default: 1024).

  • version – Optional version of the table to read.

  • sql_query – Optional SQL query to read the table and/or create alias columns.

  • shard_info – Optional sharding configuration (num_shards, shard_index). When provided, only rows where row_idx % num_shards == shard_index are yielded.

Initialization

_iter_all_rows() Iterator[Dict[str, Any]]#

Iterate over all rows (no sharding).

_add_env_storage_options()#

Add storage options from environment variables if not already set.

_iter_with_spark() Iterator[Dict[str, Any]]#

Iterate using Spark (supports deletion vectors on Databricks).

This backend requires a working SparkSession (e.g., Databricks notebooks/jobs). It is the recommended fallback for Delta tables that use deletion vectors.

_iter_with_deltalake() Iterator[Dict[str, Any]]#

Iterate using deltalake library.

_iter_with_databricks_sql() Iterator[Dict[str, Any]]#

Iterate using Databricks SQL Connector (for Unity Catalog tables).

This is the recommended method for accessing Unity Catalog tables as it handles authentication, deletion vectors, and column mapping natively.

__iter__() Iterator[Dict[str, Any]]#

Iterate over rows in the Delta Lake table.

Yields:

Dict containing column name to value mappings for each row.

shard(
num_shards: int,
index: int,
) nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator#

Shard the iterator for distributed processing.

Parameters:
  • num_shards – Total number of shards.

  • index – Index of this shard (0-based).

class nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset(
table_path: str,
columns: Optional[list] = None,
storage_options: Optional[Dict[str, str]] = None,
version: Optional[int] = None,
sql_query: Optional[str] = None,
)#

HuggingFace datasets-compatible wrapper for Delta Lake tables.

This class provides better integration with the HuggingFace datasets library, supporting features like sharding, shuffling, and epoch setting for distributed training scenarios.

Parameters:
  • table_path – Path to the Delta Lake table.

  • columns – Optional list of column names to read.

  • storage_options – Optional dict of storage options for cloud authentication.

  • version – Optional specific version of the Delta table to read.

Initialization

__len__() int#

Return the number of rows in the table.

__getitem__(idx: int) Dict[str, Any]#

Get a specific row by index.

__iter__() Iterator[Dict[str, Any]]#

Iterate over rows in the dataset.

set_epoch(epoch: int) None#

Set the current epoch for deterministic shuffling.

Parameters:

epoch – The epoch number.

shard(
num_shards: int,
index: int,
) nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset#

Shard the dataset for distributed processing.

Parameters:
  • num_shards – Total number of shards.

  • index – Index of this shard (0-based).

Returns:

Self for method chaining.

shuffle(
buffer_size: int = 1000,
seed: Optional[int] = None,
) nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset#

Configure shuffling for the dataset.

Note: For streaming Delta Lake datasets, shuffling is performed on-the-fly using a shuffle buffer. The actual shuffling happens during iteration.

Parameters:
  • buffer_size – Size of the shuffle buffer.

  • seed – Random seed for reproducibility.

Returns:

Self for method chaining.

take(
n: int,
) nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset#

Limit the dataset to the first n samples.

Parameters:

n – Number of samples to take.

Returns:

A new DeltaLakeDataset limited to n samples.

class nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset(
base: nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset,
limit: int,
)#

Internal wrapper to limit a Delta Lake dataset to n samples.

Initialization

__iter__() Iterator[Dict[str, Any]]#
set_epoch(epoch: int) None#
shard(num_shards: int, index: int)#
shuffle(buffer_size: int = 1000, seed: Optional[int] = None)#
take(n: int)#