Use the ColumnMappedTextInstructionIterableDataset (Streaming)#
This guide explains how to use ColumnMappedTextInstructionIterableDataset to stream instruction datasets for LLM fine-tuning, including Delta Lake/Databricks sources.
Unlike ColumnMappedTextInstructionDataset (map-style, non-streaming), this class is a torch.utils.data.IterableDataset and always loads data in streaming mode. This is intentional: it helps ensure data is consumed as a stream and avoids accidentally materializing full datasets/tables to disk or memory (which is especially important for large or sensitive corpora).
When to Use This Dataset#
Use ColumnMappedTextInstructionIterableDataset when you need:
Streaming-only behavior (e.g., to reduce accidental data leakages from full dataset materialization)
Delta Lake/Databricks (Unity Catalog, cloud lakehouse storage, DBFS, etc.)
Very large datasets where map-style loading/caching is undesirable
If you do not need streaming (and you want len(ds) / ds[i]), use ColumnMappedTextInstructionDataset.
Key Differences vs ColumnMappedTextInstructionDataset#
Iterable: you iterate (
for sample in ds:); you cannot rely onlen(ds)ords[i].Always streaming: there is no
streaming=flag; it is always enabled.Repeat behavior: by default,
repeat_on_exhaustion=True(infinite stream). Setrepeat_on_exhaustion=Falseto do a single pass.(Optional) sharding/shuffle helpers: use
.shard(num_shards, index)/.shuffle(buffer_size, seed)when supported by the underlying backend.
The column mapping and tokenization logic are shared with ColumnMappedTextInstructionDataset. See Tokenization Paths for details on output fields (input_ids, labels, attention_mask) and masking behavior.
Quickstart (Hugging Face Streaming)#
from transformers import AutoTokenizer
from nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset import (
ColumnMappedTextInstructionIterableDataset,
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B")
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="Muennighoff/natural-instructions",
split="train",
column_mapping={
"context": "definition",
"question": "inputs",
"answer": "targets",
},
tokenizer=tokenizer,
# Optional:
# limit_dataset_samples=10_000,
# repeat_on_exhaustion=False, # do one pass instead of infinite stream
)
sample = next(iter(ds))
print(sample.keys()) # input_ids / labels / attention_mask (and ___PAD_TOKEN_IDS___)
Delta Lake/Databricks#
ColumnMappedTextInstructionIterableDataset supports Delta Lake tables from:
Local Delta tables (directories containing
_delta_log)Cloud storage (S3, Azure Blob/ADLS via
abfss://, GCS viags://)Databricks (DBFS paths and Unity Catalog tables)
Installation#
Install the basic Delta Lake reader:
pip install deltalake
For Unity Catalog access outside of Spark (optional), install:
pip install databricks-sql-connector
Local Delta Table#
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="/path/to/delta_table", # directory containing _delta_log
column_mapping={"question": "prompt", "answer": "completion"},
tokenizer=tokenizer,
)
Databricks Unity Catalog#
Use the delta:// prefix so the loader selects the Delta backend:
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="delta://catalog.schema.instruction_data",
column_mapping={
"context": "system_prompt",
"question": "user_input",
"answer": "assistant_response",
},
tokenizer=tokenizer,
delta_storage_options={
"DATABRICKS_TOKEN": "dapi...", # or set DATABRICKS_TOKEN env var
"DATABRICKS_HOST": "https://your-workspace.databricks.com",
# Optional (depending on how you connect):
# "DATABRICKS_HTTP_PATH": "/sql/1.0/warehouses/...",
},
)
Cloud Storage (S3/Azure/GCS)#
# S3 Delta table
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="s3://my-bucket/path/to/delta_table",
column_mapping={"question": "input", "answer": "output"},
tokenizer=tokenizer,
delta_storage_options={
"AWS_ACCESS_KEY_ID": "...",
"AWS_SECRET_ACCESS_KEY": "...",
"AWS_REGION": "us-east-1",
},
)
# Azure (ADLS Gen2/ABFS)
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="abfss://container@account.dfs.core.windows.net/delta_table",
column_mapping={"question": "input", "answer": "output"},
tokenizer=tokenizer,
delta_storage_options={
"AZURE_STORAGE_ACCOUNT_NAME": "...",
"AZURE_STORAGE_ACCOUNT_KEY": "...",
},
)
YAML Configuration (Delta Lake/Databricks)#
dataset:
_target_: nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset.ColumnMappedTextInstructionIterableDataset
path_or_dataset_id: delta://catalog.schema.training_data
column_mapping:
context: system_prompt
question: user_message
answer: assistant_message
answer_only_loss_mask: true
delta_storage_options:
DATABRICKS_TOKEN: ${oc.env:DATABRICKS_TOKEN}
DATABRICKS_HOST: ${oc.env:DATABRICKS_HOST}
# Optional:
# DATABRICKS_HTTP_PATH: ${oc.env:DATABRICKS_HTTP_PATH}
Streaming from a Delta SQL Query (Computed/Aliased Columns)#
If you want to generate columns dynamically (joins, filters, computed prompt strings, etc.), pass a SQL query that returns the fields referenced by your column_mapping.
ds = ColumnMappedTextInstructionIterableDataset(
path_or_dataset_id="delta://catalog.schema.training_data",
column_mapping={"question": "question", "answer": "answer"},
tokenizer=tokenizer,
delta_storage_options={
"DATABRICKS_HOST": "https://your-workspace.databricks.com",
"DATABRICKS_TOKEN": "dapi...",
"DATABRICKS_HTTP_PATH": "/sql/1.0/warehouses/...",
},
delta_sql_query="""
SELECT
concat(system_prompt, '\n', user_message) AS question,
assistant_message AS answer
FROM catalog.schema.training_data
WHERE split = 'train'
""",
)
dataset:
_target_: nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset.ColumnMappedTextInstructionIterableDataset
path_or_dataset_id: delta://catalog.schema.training_data
column_mapping:
question: question
answer: answer
delta_sql_query: |
SELECT
concat(system_prompt, '\n', user_message) AS question,
assistant_message AS answer
FROM catalog.schema.training_data
WHERE split = 'train'
delta_storage_options:
DATABRICKS_HOST: ${oc.env:DATABRICKS_HOST}
DATABRICKS_TOKEN: ${oc.env:DATABRICKS_TOKEN}
DATABRICKS_HTTP_PATH: ${oc.env:DATABRICKS_HTTP_PATH}
Note
SQL engine requirement: delta_sql_query is executed via Spark (Databricks runtime/pyspark) when available, otherwise via databricks-sql-connector. It is not supported in a deltalake-only environment.
Note
Authentication: The Delta Lake loader automatically picks up credentials from environment variables (DATABRICKS_TOKEN, AWS_ACCESS_KEY_ID, AZURE_STORAGE_ACCOUNT_KEY, etc.) if not explicitly provided in delta_storage_options.
Common Arguments#
Arg |
Default |
Description |
|---|---|---|
|
|
Which split to stream from a HF repo ( |
|
|
Name of the dataset configuration/subset to load. |
|
|
Mask prompt tokens in |
|
|
If |
|
|
Optional max sequence length; used for padding/truncation when enabled. |
|
|
Padding strategy passed to the tokenizer. |
|
|
Truncation strategy passed to the tokenizer. |
|
|
Optionally limit the stream to the first (N) samples (best-effort; depends on backend). |
|
|
If |
|
|
Storage/auth options for Delta Lake backends (Databricks, S3, Azure, GCS). |
|
|
Specific Delta table version to read. |
|
|
SQL query to generate rows for Delta sources (Spark / Databricks SQL only). |