Use the ColumnMappedTextInstructionIterableDataset (Streaming)

View as Markdown

This guide explains how to use ColumnMappedTextInstructionIterableDataset to stream instruction datasets for LLM fine-tuning, including Delta Lake/Databricks sources.

Unlike ColumnMappedTextInstructionDataset (map-style, non-streaming), this class is a torch.utils.data.IterableDataset and always loads data in streaming mode. This is intentional: it helps ensure data is consumed as a stream and avoids accidentally materializing full datasets/tables to disk or memory (which is especially important for large or sensitive corpora).

When to Use This Dataset

Use ColumnMappedTextInstructionIterableDataset when you need:

  • Streaming-only behavior (e.g., to reduce accidental data leakages from full dataset materialization)
  • Delta Lake/Databricks (Unity Catalog, cloud lakehouse storage, DBFS, etc.)
  • Very large datasets where map-style loading/caching is undesirable

If you do not need streaming (and you want len(ds) / ds[i]), use ColumnMappedTextInstructionDataset.

Key Differences vs ColumnMappedTextInstructionDataset

  • Iterable: you iterate (for sample in ds:); you cannot rely on len(ds) or ds[i].
  • Always streaming: there is no streaming= flag; it is always enabled.
  • Repeat behavior: by default, repeat_on_exhaustion=True (infinite stream). Set repeat_on_exhaustion=False to do a single pass.
  • (Optional) sharding/shuffle helpers: use .shard(num_shards, index) / .shuffle(buffer_size, seed) when supported by the underlying backend.

The column mapping and tokenization logic are shared with ColumnMappedTextInstructionDataset. See Tokenization Paths for details on output fields (input_ids, labels, attention_mask) and masking behavior.

Quickstart (Hugging Face Streaming)

1from transformers import AutoTokenizer
2
3from nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset import (
4 ColumnMappedTextInstructionIterableDataset,
5)
6
7tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B")
8
9ds = ColumnMappedTextInstructionIterableDataset(
10 path_or_dataset_id="Muennighoff/natural-instructions",
11 split="train",
12 column_mapping={
13 "context": "definition",
14 "question": "inputs",
15 "answer": "targets",
16 },
17 tokenizer=tokenizer,
18 # Optional:
19 # limit_dataset_samples=10_000,
20 # repeat_on_exhaustion=False, # do one pass instead of infinite stream
21)
22
23sample = next(iter(ds))
24print(sample.keys()) # input_ids / labels / attention_mask (and ___PAD_TOKEN_IDS___)

Delta Lake/Databricks

ColumnMappedTextInstructionIterableDataset supports Delta Lake tables from:

  • Local Delta tables (directories containing _delta_log)
  • Cloud storage (S3, Azure Blob/ADLS via abfss://, GCS via gs://)
  • Databricks (DBFS paths and Unity Catalog tables)

Installation

Install the basic Delta Lake reader:

$pip install deltalake

For Unity Catalog access outside of Spark (optional), install:

$pip install databricks-sql-connector

Local Delta Table

1ds = ColumnMappedTextInstructionIterableDataset(
2 path_or_dataset_id="/path/to/delta_table", # directory containing _delta_log
3 column_mapping={"question": "prompt", "answer": "completion"},
4 tokenizer=tokenizer,
5)

Databricks Unity Catalog

Use the delta:// prefix so the loader selects the Delta backend:

1ds = ColumnMappedTextInstructionIterableDataset(
2 path_or_dataset_id="delta://catalog.schema.instruction_data",
3 column_mapping={
4 "context": "system_prompt",
5 "question": "user_input",
6 "answer": "assistant_response",
7 },
8 tokenizer=tokenizer,
9 delta_storage_options={
10 "DATABRICKS_TOKEN": "dapi...", # or set DATABRICKS_TOKEN env var
11 "DATABRICKS_HOST": "https://your-workspace.databricks.com",
12 # Optional (depending on how you connect):
13 # "DATABRICKS_HTTP_PATH": "/sql/1.0/warehouses/...",
14 },
15)

Cloud Storage (S3/Azure/GCS)

1# S3 Delta table
2ds = ColumnMappedTextInstructionIterableDataset(
3 path_or_dataset_id="s3://my-bucket/path/to/delta_table",
4 column_mapping={"question": "input", "answer": "output"},
5 tokenizer=tokenizer,
6 delta_storage_options={
7 "AWS_ACCESS_KEY_ID": "...",
8 "AWS_SECRET_ACCESS_KEY": "...",
9 "AWS_REGION": "us-east-1",
10 },
11)
12
13# Azure (ADLS Gen2/ABFS)
14ds = ColumnMappedTextInstructionIterableDataset(
15 path_or_dataset_id="abfss://container@account.dfs.core.windows.net/delta_table",
16 column_mapping={"question": "input", "answer": "output"},
17 tokenizer=tokenizer,
18 delta_storage_options={
19 "AZURE_STORAGE_ACCOUNT_NAME": "...",
20 "AZURE_STORAGE_ACCOUNT_KEY": "...",
21 },
22)

YAML Configuration (Delta Lake/Databricks)

1dataset:
2 _target_: nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset.ColumnMappedTextInstructionIterableDataset
3 path_or_dataset_id: delta://catalog.schema.training_data
4 column_mapping:
5 context: system_prompt
6 question: user_message
7 answer: assistant_message
8 answer_only_loss_mask: true
9 delta_storage_options:
10 DATABRICKS_TOKEN: ${oc.env:DATABRICKS_TOKEN}
11 DATABRICKS_HOST: ${oc.env:DATABRICKS_HOST}
12 # Optional:
13 # DATABRICKS_HTTP_PATH: ${oc.env:DATABRICKS_HTTP_PATH}

Streaming from a Delta SQL Query (Computed/Aliased Columns)

If you want to generate columns dynamically (joins, filters, computed prompt strings, etc.), pass a SQL query that returns the fields referenced by your column_mapping.

1ds = ColumnMappedTextInstructionIterableDataset(
2 path_or_dataset_id="delta://catalog.schema.training_data",
3 column_mapping={"question": "question", "answer": "answer"},
4 tokenizer=tokenizer,
5 delta_storage_options={
6 "DATABRICKS_HOST": "https://your-workspace.databricks.com",
7 "DATABRICKS_TOKEN": "dapi...",
8 "DATABRICKS_HTTP_PATH": "/sql/1.0/warehouses/...",
9 },
10 delta_sql_query="""
11 SELECT
12 concat(system_prompt, '\n', user_message) AS question,
13 assistant_message AS answer
14 FROM catalog.schema.training_data
15 WHERE split = 'train'
16 """,
17)
1dataset:
2 _target_: nemo_automodel.components.datasets.llm.column_mapped_text_instruction_iterable_dataset.ColumnMappedTextInstructionIterableDataset
3 path_or_dataset_id: delta://catalog.schema.training_data
4 column_mapping:
5 question: question
6 answer: answer
7 delta_sql_query: |
8 SELECT
9 concat(system_prompt, '\n', user_message) AS question,
10 assistant_message AS answer
11 FROM catalog.schema.training_data
12 WHERE split = 'train'
13 delta_storage_options:
14 DATABRICKS_HOST: ${oc.env:DATABRICKS_HOST}
15 DATABRICKS_TOKEN: ${oc.env:DATABRICKS_TOKEN}
16 DATABRICKS_HTTP_PATH: ${oc.env:DATABRICKS_HTTP_PATH}

SQL engine requirement: delta_sql_query is executed via Spark (Databricks runtime/pyspark) when available, otherwise via databricks-sql-connector. It is not supported in a deltalake-only environment.

Authentication: The Delta Lake loader automatically picks up credentials from environment variables (DATABRICKS_TOKEN, AWS_ACCESS_KEY_ID, AZURE_STORAGE_ACCOUNT_KEY, etc.) if not explicitly provided in delta_storage_options.

Common Arguments

ArgDefaultDescription
splitNoneWhich split to stream from a HF repo (train, validation, etc.). Ignored for local files and Delta tables.
nameNoneName of the dataset configuration/subset to load.
answer_only_loss_maskTrueMask prompt tokens in labels with -100 (CrossEntropy ignore_index).
use_hf_chat_templateFalseIf True and the tokenizer supports chat templates, format via tokenizer.apply_chat_template(...).
seq_lengthNoneOptional max sequence length; used for padding/truncation when enabled.
padding"do_not_pad"Padding strategy passed to the tokenizer.
truncation"do_not_truncate"Truncation strategy passed to the tokenizer.
limit_dataset_samplesNoneOptionally limit the stream to the first (N) samples (best-effort; depends on backend).
repeat_on_exhaustionTrueIf True, restart the stream on exhaustion (useful for step-based training).
delta_storage_optionsNoneStorage/auth options for Delta Lake backends (Databricks, S3, Azure, GCS).
delta_versionNoneSpecific Delta table version to read.
delta_sql_queryNoneSQL query to generate rows for Delta sources (Spark / Databricks SQL only).