Data Preparation Module#
The nemotron.data_prep module handles last-mile data processing: transforming curated datasets into training-ready formats. It sits between data curation (handled by NeMo Curator) and model training, producing outputs compatible with the NVIDIA AI training stack.
Coming Soon: Native integration with NeMo Curator for data curation and NeMo Data Designer for synthetic data generation. These integrations will connect raw data sources directly to training-ready format production.
Overview#
Built on Ray, the module provides:
Last-mile processing β convert curated datasets to training-ready formats (tokenization, packing, chat templating)
Distributed processing β scale from a single machine to a cluster of workers using Ray actors
Cloud-native I/O β read from HuggingFace Hub (
hf://), S3 (s3://), GCS (gs://), or local paths via fsspecDeterministic output β frozen shard plans ensure reproducible results across runs
Resumable pipelines β skip completed shards on restart; verify output integrity with checksums
Data Pipeline: Curator β Data Prep β Training#
This module is designed to work alongside NeMo Curator in a two-stage data pipeline:
%%{init: {'theme': 'base', 'themeVariables': { 'primaryBorderColor': '#333333', 'lineColor': '#333333', 'primaryTextColor': '#333333', 'clusterBkg': '#ffffff', 'clusterBorder': '#333333'}}}%%
flowchart LR
subgraph sources["π Raw Data"]
CC["CommonCrawl"]
HF["HuggingFace"]
Custom["Custom Sources"]
end
subgraph curator["π§ NeMo Curator"]
C1["Deduplication"]
C2["Quality Filtering"]
C3["Language ID"]
C4["PII Removal"]
C1 --- C2 --- C3 --- C4
end
subgraph dataprep["β‘ Data Prep"]
D1["Tokenization"]
D2["Chat Templating"]
D3["Sequence Packing"]
D4["Loss Mask Creation"]
D1 --- D2 --- D3 --- D4
end
subgraph training["π Training"]
T1["Megatron-Bridge"]
T2["NeMo-RL"]
end
sources --> curator
curator -->|"Curated Data"| dataprep
dataprep -->|"bin/idx, Parquet, JSONL"| training
Typical workflow:
Use NeMo Curator to curate raw data at scaleβdeduplication, quality filtering, language identification, PII removal
Use Data Prep to transform curated data into training-ready formatsβtokenization, chat templating, sequence packing, loss mask generation
This separation allows you to curate once and prepare data multiple times for different training configurations (different tokenizers, sequence lengths, or output formats).
Recipe Integration#
Each training stage in a recipe includes a dedicated data preparation step that transforms source data into the format required by that stageβs training framework:
Stage |
Data Prep Output |
Training Framework |
Guide |
|---|---|---|---|
Stage 0: Pretrain |
bin/idx indexed datasets |
||
Stage 1: SFT |
Packed Parquet with loss masks |
||
Stage 2: RL |
JSONL with OpenAI chat format |
Run data preparation for any stage using the CLI:
uv run nemotron nano3 data prep pretrain --run YOUR-CLUSTER # Stage 0
uv run nemotron nano3 data prep sft --run YOUR-CLUSTER # Stage 1
uv run nemotron nano3 data prep rl --run YOUR-CLUSTER # Stage 2
Note: The
--run YOUR-CLUSTERflag submits jobs via NeMo-Run. See Execution through NeMo-Run for setup.
NeMo-Run Integration#
The module integrates natively with NeMo-Run for job orchestration. Submit data preparation jobs to various executors (Slurm, local, Docker, cloud) directly from your local machine:
# Submit to Slurm cluster
uv run nemotron nano3 data prep pretrain --run YOUR-CLUSTER
# Run locally with Ray
uv run nemotron nano3 data prep pretrain --run local
# Preview without executing
uv run nemotron nano3 data prep pretrain --run YOUR-CLUSTER --dry-run
Configure execution profiles in env.toml:
[YOUR-CLUSTER]
executor = "slurm"
account = "YOUR-ACCOUNT"
partition = "batch"
See Execution through NeMo-Run for complete configuration options.
Choosing an API#
API |
Use When |
Output Format |
|---|---|---|
|
Pretraining tokenization |
bin/idx |
|
Chat SFT with loss masking |
Packed Parquet |
run_pretrain_pipeline()β Standard pretraining tokenization to bin/idx formatrun_sft_pipeline()β Chat SFT with role-based loss masking to packed Parquet
For JSONL output (RL training):
Use the stage-specific scripts in
nemotron/recipes/nano3/stage2_rl/data_prep.pyOr call
process_jsonl_shard_core()directly fromnemotron.data_prep.core
Supported Output Formats#
Format |
Recipe |
Output |
Use Case |
|---|---|---|---|
|
|
|
Pretraining |
|
|
|
Chat SFT |
|
Stage scripts |
|
RL training |
Quick Start#
Pretrain Pipeline (bin/idx)#
Tokenize text data to Megatron bin/idx format:
from nemotron.data_prep import DataBlend, run_pretrain_pipeline
blend = DataBlend.load("pretrain_blend.json")
result = run_pretrain_pipeline(
blend=blend,
output_dir="./output",
tokenizer="nvidia/NVIDIA-Nemotron-Nano-9B-v2",
num_shards=128,
)
print(f"Run hash: {result.run_hash}")
print(f"Total tokens: {result.total_tokens:,}")
print(f"Data paths: {result.data_paths}")
SFT Pipeline (Packed Parquet)#
Chat SFT with role-based loss masking to packed Parquet:
from nemotron.data_prep import DataBlend, run_sft_pipeline
blend = DataBlend.load("sft_blend.json")
result = run_sft_pipeline(
blend=blend,
output_dir="./output",
tokenizer="nvidia/NVIDIA-Nemotron-Nano-9B-v2",
num_shards=64,
chat_template="nano3",
pack_size=4096,
)
print(f"Run hash: {result.run_hash}")
print(f"Total sequences: {result.total_sequences:,}")
Output Formats#
BinIdx (Default)#
Tokenized binary format for Megatron pretraining:
from nemotron.data_prep.config import BinIdxOutputConfig
config = PipelineConfig(
tokenizer=TokenizerConfig(model="meta-llama/Llama-3.2-1B"),
output=OutputConfig(
dir=Path("./tokenized"),
format=BinIdxOutputConfig(
shard_size="256MB", # Or num_shards=128
dtype="int32",
),
),
)
JSONL#
Structured JSONL for SFT/RL training (no tokenization):
from nemotron.data_prep.config import JsonlOutputConfig
from nemotron.data_prep.formats.transforms import sft, openai_chat
# SFT format: {"input": "...", "output": "..."}
config = PipelineConfig(
output=OutputConfig(
dir=Path("./sft_data"),
format=JsonlOutputConfig(
transform=sft(input="instruction", output="response"),
compression="zstd", # Optional compression
),
),
)
# OpenAI chat format: {"messages": [...]}
config = PipelineConfig(
output=OutputConfig(
dir=Path("./rl_data"),
format=JsonlOutputConfig(
transform=openai_chat(),
),
),
)
Chat SFT (Packed with Loss Masking)#
Chat-templated SFT with role-based loss masking. This format applies chat templates to OpenAI-format messages, tokenizes them, and produces packed Parquet sequences with a loss mask that zeros out system/user tokens:
from nemotron.data_prep import DataBlend, run_sft_pipeline
blend = DataBlend.load("chat_data.json")
result = run_sft_pipeline(
blend=blend,
output_dir="./chat_sft",
tokenizer="nvidia/NVIDIA-Nemotron-Nano-9B-v2",
num_shards=64,
chat_template="nano3", # Built-in template or path to .jinja file
pack_size=4096, # Maximum tokens per packed sequence
algorithm="first_fit_shuffle",
)
Input format (OpenAI chat messages):
{
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
{"role": "assistant", "content": "Hi there!"}
]
}
Output:
.parquetfiles with packedinput_idsandloss_maskarraysLoss mask:
0for system/user tokens,1for assistant tokensMetadata files for Megatron-Bridge compatibility
Transforms#
Transforms convert input records to the desired output format. They are callables that take a dict and return a dict (or None to skip the record).
Built-in Transform Factories#
from nemotron.data_prep.formats.transforms import (
sft, # SFT format: {input, output}
openai_chat, # OpenAI format: {messages: [...]}
sharegpt, # ShareGPT format: {conversations: [...]}
nemotron_rl, # Nemotron RL format: {messages, tools}
passthrough, # Pass records unchanged
select, # Select specific fields
rename, # Rename fields
)
sft()#
Creates SFT format output:
transform = sft(
input="instruction", # Source field for input
output="response", # Source field for output
system="system_prompt" # Optional system prompt field
)
# Input: {"instruction": "Hello", "response": "Hi!", "system_prompt": "Be helpful"}
# Output: {"input": "Hello", "output": "Hi!", "system": "Be helpful"}
openai_chat()#
Creates OpenAI chat format:
transform = openai_chat(messages="conversation")
# Input: {"conversation": [{"role": "user", "content": "Hi"}]}
# Output: {"messages": [{"role": "user", "content": "Hi"}]}
nemotron_rl()#
Extracts messages and tools from Nemotron RL dataset format:
transform = nemotron_rl()
# Input: {
# "responses_create_params": {
# "input": [{"role": "user", "content": "Hi"}],
# "tools": [{"name": "search", ...}]
# }
# }
# Output: {"messages": [{"role": "user", "content": "Hi"}], "tools": [...]}
Records without valid responses_create_params.input are skipped.
passthrough()#
Passes records unchanged:
transform = passthrough()
# Input: {"any": "data"}
# Output: {"any": "data"}
select()#
Selects specific fields:
transform = select("id", "text")
# Input: {"id": 1, "text": "hello", "extra": "ignored"}
# Output: {"id": 1, "text": "hello"}
rename()#
Renames fields:
transform = rename(input="question", output="answer")
# Input: {"question": "What?", "answer": "This."}
# Output: {"input": "What?", "output": "This."}
Custom Transforms#
You can use any callable:
# Lambda
transform = lambda r: {"input": r["q"], "output": r["a"]} if r.get("valid") else None
# Function
def my_transform(record: dict) -> dict | None:
if len(record.get("text", "")) < 10:
return None # Skip short records
return {"input": record["question"], "output": record["answer"]}
Filtering Records#
Return None from a transform to skip records. This is useful for filtering out low-quality or malformed data:
def filter_by_length(min_chars: int = 100) -> Transform:
"""Skip records with text shorter than min_chars."""
def transform(record: dict) -> dict | None:
text = record.get("text", "")
if len(text) < min_chars:
return None # Record will be skipped
return record
return transform
# Usage
config = PipelineConfig(
output=OutputConfig(
dir=Path("./filtered"),
format=JsonlOutputConfig(transform=filter_by_length(min_chars=200)),
),
)
Built-in transforms also filter: for example, nemotron_rl() skips records missing required fields.
Per-Split Output#
Generate separate train/valid/test outputs using PerSplitConfig:
from nemotron.data_prep import PerSplitConfig
from nemotron.data_prep.config import PipelineConfig, OutputConfig, BinIdxOutputConfig, TokenizerConfig
from pathlib import Path
config = PipelineConfig(
tokenizer=TokenizerConfig(model="nvidia/NVIDIA-Nemotron-Nano-9B-v2"),
output=OutputConfig(
dir=Path("./output"),
format=BinIdxOutputConfig(num_shards=64),
),
per_split=PerSplitConfig(
enabled=True,
valid_shards=1, # Number of validation shards
test_shards=1, # Number of test shards
),
)
Output structure:
output/
βββ train/
β βββ shard_000000.bin
β βββ shard_000000.idx
β βββ ...
βββ valid/
β βββ shard_000000.bin/.idx
βββ test/
β βββ shard_000000.bin/.idx
βββ blend.json
blend.json format (per-split mode):
{
"train": [["1.0", "/path/to/train/shard_000000"], ["1.0", "/path/to/train/shard_000001"]],
"valid": [["1.0", "/path/to/valid/shard_000000"]],
"test": [["1.0", "/path/to/test/shard_000000"]]
}
This format is directly compatible with Megatron-Bridgeβs per-split data loading.
Type Definitions#
TypedDicts are provided for type safety:
from nemotron.data_prep.formats.transforms import (
SftRecord, # {"input": str, "output": str}
OpenAIChatRecord, # {"messages": list[Message]}
ShareGPTRecord, # {"conversations": list[Conversation]}
Message, # {"role": str, "content": str}
)
API Reference#
Recipe Entry Points#
Function |
Description |
|---|---|
|
Tokenize to Megatron bin/idx format |
|
Chat SFT to packed Parquet format |
Core Processing Functions#
Function |
Description |
|---|---|
|
Tokenize shard to bin/idx (alias: |
|
Transform and write JSONL records |
|
Tokenize chat messages to spool intermediate |
|
Pack spool to Parquet output |
Configuration Classes#
Class |
Description |
|---|---|
|
Pipeline configuration |
|
Tokenizer settings (model, type, add_bos, add_eos) |
|
Output directory and format |
|
Tokenized binary format options |
|
JSONL format options |
|
Chat SFT with loss masking options |
|
W&B and pipeline logging settings |
Result Classes#
Class |
Description |
|---|---|
|
Pipeline result with run metadata, data paths, and statistics |
|
Artifact with blend.json path and metrics |
Compression#
JSONL output supports optional zstd compression:
format=JsonlOutputConfig(
compression="zstd", # Output .jsonl.zst files
)
Requires the zstandard package: uv pip install zstandard
Dependencies#
Core dependencies:
ray- Parallel processingpyarrow- Parquet file readingxxhash- Fast checksums
Optional dependencies:
orjson- Fast JSON serialization (falls back to stdlib json)zstandard- Zstd compression for JSONL output
Further Reading#
NeMo Curator β data curation at scale (coming soon)
NeMo Data Designer β synthetic data generation (coming soon)
NVIDIA AI Stack β Megatron-Core, Megatron-Bridge, NeMo-RL
Execution through NeMo-Run β job orchestration and execution profiles
CLI Framework β CLI building and recipe commands
Artifact Lineage β W&B artifact system and lineage tracking
Xenna Observability β real-time W&B logging for xenna pipelines
Nano3 Recipe β training recipe example