nemo_automodel.components.datasets.llm.delta_lake_dataset#
Delta Lake dataset support for streaming instruction-tuning datasets.
This module provides support for reading Delta Lake tables from Databricks or local storage as streaming datasets. It integrates with the existing ColumnMappedTextInstructionDataset infrastructure.
Supports tables with Deletion Vectors (Databricks Runtime 15.4+) via Spark (Databricks runtime) and optionally via Databricks SQL Connector for Unity Catalog access outside of Spark.
Installation: ```bash # For basic Delta Lake support (without deletion vectors) pip install deltalake
# For Databricks Unity Catalog access without Spark (optional)
pip install databricks-sql-connector deltalake
```
Usage: Local Delta tables:
```python
from nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset import (
ColumnMappedTextInstructionIterableDataset,
)
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="delta:///path/to/delta_table",
column_mapping={"question": "input", "answer": "output"},
tokenizer=tokenizer,
)
```
Cloud storage (S3, Azure, GCS):
```python
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="s3://bucket/path/to/delta_table",
column_mapping={"question": "input", "answer": "output"},
tokenizer=tokenizer,
delta_storage_options={
"AWS_ACCESS_KEY_ID": "...",
"AWS_SECRET_ACCESS_KEY": "...",
},
)
```
Databricks Unity Catalog (streaming):
```python
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="delta://catalog.schema.table", # Unity Catalog format
column_mapping={"question": "input", "answer": "output"},
tokenizer=tokenizer,
delta_storage_options={
"DATABRICKS_HOST": "https://your-workspace.databricks.com",
"DATABRICKS_TOKEN": "dapi...",
"DATABRICKS_WAREHOUSE_ID": "abc123def456", # or DATABRICKS_HTTP_PATH
},
)
```
Environment Variables: The following environment variables are automatically detected:
- DATABRICKS_HOST / DATABRICKS_WORKSPACE_URL
- DATABRICKS_TOKEN / DATABRICKS_ACCESS_TOKEN
- DATABRICKS_HTTP_PATH / DATABRICKS_WAREHOUSE_ID / DATABRICKS_CLUSTER_ID
- AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_REGION
- AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCOUNT_KEY, AZURE_STORAGE_SAS_TOKEN
- GOOGLE_APPLICATION_CREDENTIALS
Module Contents#
Classes#
Iterator that yields rows from a Delta Lake table. |
|
HuggingFace datasets-compatible wrapper for Delta Lake tables. |
|
Internal wrapper to limit a Delta Lake dataset to n samples. |
Functions#
Check if the deltalake package is available. |
|
Check if PySpark is available (used as a fallback on Databricks for deletion vectors). |
|
Check if databricks-sql-connector is available for Unity Catalog access. |
|
Check if any Delta Lake reader is available (deltalake, Spark, or databricks-sql). |
|
Return True if e looks like the deltalake ‘deletionVectors’ unsupported-reader-features error. |
|
Get an active Spark session if available (Databricks notebooks/jobs). |
|
Check if path refers to a Unity Catalog table (catalog.schema.table format). |
|
Check if a path refers to a Delta Lake table. |
|
Normalize a Delta Lake path by removing the delta:// prefix if present. |
|
Parse Unity Catalog managed storage IDs from a __unitystorage path. |
|
Quote an identifier for Spark SQL (handles embedded backticks). |
|
Build a fully-qualified UC table name with safe quoting. |
|
Best-effort reverse lookup of a UC table name via Databricks system tables. |
|
If path looks like UC managed storage, try to resolve to catalog.schema.table. |
|
Return True if e looks like Databricks UC managed-storage overlap. |
Data#
API#
- nemo_automodel.components.datasets.llm.delta_lake_dataset.logger#
‘getLogger(…)’
- nemo_automodel.components.datasets.llm.delta_lake_dataset._DELTALAKE_AVAILABLE: Optional[bool]#
None
- nemo_automodel.components.datasets.llm.delta_lake_dataset._DATABRICKS_SQL_AVAILABLE: Optional[bool]#
None
- nemo_automodel.components.datasets.llm.delta_lake_dataset._PYSPARK_AVAILABLE: Optional[bool]#
None
- nemo_automodel.components.datasets.llm.delta_lake_dataset._check_deltalake_available() bool#
Check if the deltalake package is available.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._check_pyspark_available() bool#
Check if PySpark is available (used as a fallback on Databricks for deletion vectors).
- nemo_automodel.components.datasets.llm.delta_lake_dataset._check_databricks_sql_available() bool#
Check if databricks-sql-connector is available for Unity Catalog access.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._check_delta_reader_available() bool#
Check if any Delta Lake reader is available (deltalake, Spark, or databricks-sql).
- nemo_automodel.components.datasets.llm.delta_lake_dataset._is_deletion_vectors_error(e: BaseException) bool#
Return True if e looks like the deltalake ‘deletionVectors’ unsupported-reader-features error.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._get_spark_session() Optional[Any]#
Get an active Spark session if available (Databricks notebooks/jobs).
- Returns:
A SparkSession instance if available, else None.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._is_unity_catalog_path(path: str) bool#
Check if path refers to a Unity Catalog table (catalog.schema.table format).
- nemo_automodel.components.datasets.llm.delta_lake_dataset.is_delta_lake_path(path: str) bool#
Check if a path refers to a Delta Lake table.
A path is considered a Delta Lake path if:
It starts with “delta://” protocol prefix
It’s a local directory containing a “_delta_log” subdirectory
It starts with “dbfs:/” (Databricks file system)
- Parameters:
path – The path to check.
- Returns:
True if the path is a Delta Lake table, False otherwise.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._normalize_delta_path(path: str) str#
Normalize a Delta Lake path by removing the delta:// prefix if present.
- Parameters:
path – The Delta Lake path.
- Returns:
The normalized path suitable for the deltalake library.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._UNITY_STORAGE_TABLE_PATH_RE#
‘compile(…)’
- nemo_automodel.components.datasets.llm.delta_lake_dataset._parse_unity_storage_ids(
- path: str,
Parse Unity Catalog managed storage IDs from a __unitystorage path.
Databricks Unity Catalog managed tables use internal cloud locations like: …/__unitystorage/catalogs/<catalog_uuid>/tables/<table_uuid> Direct path access to these locations is blocked on Databricks (“LOCATION_OVERLAP”).
- nemo_automodel.components.datasets.llm.delta_lake_dataset._quote_sql_ident(ident: str) str#
Quote an identifier for Spark SQL (handles embedded backticks).
- nemo_automodel.components.datasets.llm.delta_lake_dataset._build_uc_table_fqn(catalog: str, schema: str, table: str) str#
Build a fully-qualified UC table name with safe quoting.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._try_resolve_uc_table_from_system_tables(
- spark: Any,
- *,
- table_id: Optional[str] = None,
- storage_location: Optional[str] = None,
Best-effort reverse lookup of a UC table name via Databricks system tables.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._resolve_uc_table_from_unity_storage_path(
- spark: Any,
- path: str,
If path looks like UC managed storage, try to resolve to catalog.schema.table.
- nemo_automodel.components.datasets.llm.delta_lake_dataset._is_location_overlap_error(e: BaseException) bool#
Return True if e looks like Databricks UC managed-storage overlap.
- class nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeIterator(
- table_path: str,
- columns: Optional[list] = None,
- storage_options: Optional[Dict[str, str]] = None,
- batch_size: int = 1024,
- version: Optional[int] = None,
- sql_query: Optional[str] = None,
- shard_info: Optional[tuple[int, int]] = None,
Iterator that yields rows from a Delta Lake table.
This class provides a streaming interface for Delta Lake tables, yielding rows as dictionaries one at a time to support memory-efficient iteration over large tables.
Supports tables with deletion vectors (Databricks Runtime 15.4+) via Spark backend (recommended when running in Databricks notebooks/jobs).
- Parameters:
table_path – Path to the Delta Lake table.
columns – Optional list of column names to read. If None, reads all columns.
storage_options – Optional storage options for cloud storage access.
batch_size – Number of rows to read at a time (default: 1024).
version – Optional version of the table to read.
sql_query – Optional SQL query to read the table and/or create alias columns.
shard_info – Optional sharding configuration
(num_shards, shard_index). When provided, only rows whererow_idx % num_shards == shard_indexare yielded.
Initialization
- _iter_all_rows() Iterator[Dict[str, Any]]#
Iterate over all rows (no sharding).
- _add_env_storage_options()#
Add storage options from environment variables if not already set.
- _iter_with_spark() Iterator[Dict[str, Any]]#
Iterate using Spark (supports deletion vectors on Databricks).
This backend requires a working SparkSession (e.g., Databricks notebooks/jobs). It is the recommended fallback for Delta tables that use deletion vectors.
- _iter_with_deltalake() Iterator[Dict[str, Any]]#
Iterate using deltalake library.
- _iter_with_databricks_sql() Iterator[Dict[str, Any]]#
Iterate using Databricks SQL Connector (for Unity Catalog tables).
This is the recommended method for accessing Unity Catalog tables as it handles authentication, deletion vectors, and column mapping natively.
- __iter__() Iterator[Dict[str, Any]]#
Iterate over rows in the Delta Lake table.
- Yields:
Dict containing column name to value mappings for each row.
- shard(
- num_shards: int,
- index: int,
Shard the iterator for distributed processing.
- Parameters:
num_shards – Total number of shards.
index – Index of this shard (0-based).
- class nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset(
- table_path: str,
- columns: Optional[list] = None,
- storage_options: Optional[Dict[str, str]] = None,
- version: Optional[int] = None,
- sql_query: Optional[str] = None,
HuggingFace datasets-compatible wrapper for Delta Lake tables.
This class provides better integration with the HuggingFace datasets library, supporting features like sharding, shuffling, and epoch setting for distributed training scenarios.
- Parameters:
table_path – Path to the Delta Lake table.
columns – Optional list of column names to read.
storage_options – Optional dict of storage options for cloud authentication.
version – Optional specific version of the Delta table to read.
Initialization
- __len__() int#
Return the number of rows in the table.
- __getitem__(idx: int) Dict[str, Any]#
Get a specific row by index.
- __iter__() Iterator[Dict[str, Any]]#
Iterate over rows in the dataset.
- set_epoch(epoch: int) None#
Set the current epoch for deterministic shuffling.
- Parameters:
epoch – The epoch number.
- shard(
- num_shards: int,
- index: int,
Shard the dataset for distributed processing.
- Parameters:
num_shards – Total number of shards.
index – Index of this shard (0-based).
- Returns:
Self for method chaining.
- shuffle(
- buffer_size: int = 1000,
- seed: Optional[int] = None,
Configure shuffling for the dataset.
Note: For streaming Delta Lake datasets, shuffling is performed on-the-fly using a shuffle buffer. The actual shuffling happens during iteration.
- Parameters:
buffer_size – Size of the shuffle buffer.
seed – Random seed for reproducibility.
- Returns:
Self for method chaining.
- take(
- n: int,
Limit the dataset to the first n samples.
- Parameters:
n – Number of samples to take.
- Returns:
A new DeltaLakeDataset limited to n samples.
- class nemo_automodel.components.datasets.llm.delta_lake_dataset._LimitedDeltaLakeDataset(
- base: nemo_automodel.components.datasets.llm.delta_lake_dataset.DeltaLakeDataset,
- limit: int,
Internal wrapper to limit a Delta Lake dataset to n samples.
Initialization
- __iter__() Iterator[Dict[str, Any]]#
- set_epoch(epoch: int) None#
- shard(num_shards: int, index: int)#
- shuffle(buffer_size: int = 1000, seed: Optional[int] = None)#
- take(n: int)#