Nemotron-CC Pipelines#
Nemotron-CC provides advanced synthetic data generation workflows for transforming and extracting knowledge from existing text documents. Unlike simple generation, these pipelines use sophisticated preprocessing, LLM-based transformation, and postprocessing to create high-quality training data.
The Composable Pipeline Pattern#
Nemotron-CC stages follow a composable pattern with three distinct phases:
Preprocessing: Segment documents, filter by length, and prepare inputs for the LLM
Generation: Apply task-specific prompts to transform text using the LLM
Postprocessing: Clean outputs, remove formatting artifacts, and filter low-quality results
This separation enables fine-grained control over each phase while providing reusable helper functions for common patterns.
Pipeline Architecture#
flowchart TB
subgraph "Preprocessing"
A[Input Documents] --> B[Token Count Filter]
B --> C[Document Splitter]
C --> D[Segment Filter]
D --> E[Document Joiner]
end
subgraph "LLM Generation"
E --> F[Task-Specific Stage<br/>WikiPara/DiverseQA/Distill/etc.]
end
subgraph "Postprocessing"
F --> G[Token Count Filter]
G --> H[Markdown Remover]
H --> I[Task-Specific Cleanup]
I --> J[Quality Filter]
end
J --> K[Output Dataset]
Input Data Requirements#
Before running a Nemotron-CC pipeline, prepare your input data as Parquet files with the required schema.
Required Schema#
Column |
Type |
Description |
|---|---|---|
|
|
Unique document identifier. Required by the preprocessing pipeline to reassemble document segments after splitting. |
|
|
Document content to transform. This is the primary input field for all Nemotron-CC stages. |
|
|
Quality score used to route documents to appropriate pipelines. Values typically range from 0-20, where higher scores indicate higher quality content. |
Quality Score Field#
The bucketed_results field contains quality scores that determine which pipeline processes each document:
High-quality documents (
bucketed_results > 11): Process with DiverseQA, Distill, ExtractKnowledge, or KnowledgeList tasksLow-quality documents (
bucketed_results <= 11): Process with WikipediaParaphrasing to improve text quality
Generating Quality Scores#
Use NeMo Curator’s quality assessment tools to generate quality scores before running SDG pipelines:
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import ParquetWriter
from nemo_curator.stages.text.classifiers import FineWebEduClassifier
from nemo_curator.stages.text.modules import AddId
# Create pipeline to score documents
pipeline = Pipeline(name="quality_scoring")
# Read raw documents
pipeline.add_stage(JsonlReader(file_paths="raw_data/*.jsonl", fields=["text"]))
# Add unique document IDs
pipeline.add_stage(AddId(id_field="id"))
# Score document quality (outputs int score 0-5)
pipeline.add_stage(
FineWebEduClassifier(
int_score_field="bucketed_results", # Use this as quality score
)
)
# Save as Parquet for SDG pipeline
pipeline.add_stage(ParquetWriter(path="scored_data/"))
results = pipeline.run()
Tip
The example above uses FineWebEduClassifier which outputs scores 0-5. For the Nemotron-CC threshold of 11, you can either:
Scale the scores (e.g., multiply by 4)
Adjust the filter threshold in your SDG pipeline
Use a different classifier that outputs scores in the 0-20 range
See also
For detailed information on quality scoring options, see Quality Assessment & Filtering.
Example Data#
An example Parquet file with the correct schema is available in the tutorials directory:
tutorials/synthetic/nemotron_cc/example_data/data.parquet
You can inspect its structure:
import pandas as pd
df = pd.read_parquet("tutorials/synthetic/nemotron_cc/example_data/data.parquet")
print(df.columns.tolist()) # ['id', 'text', 'bucketed_results']
print(df.head(2))
Available Tasks#
Nemotron-CC provides five specialized generation tasks, each designed for specific data transformation needs:
Task |
Stage Class |
Purpose |
Use Case |
|---|---|---|---|
Wikipedia Paraphrasing |
|
Rewrite text as Wikipedia-style prose |
Improving noisy web data |
Diverse QA |
|
Generate diverse Q&A pairs |
Reading comprehension training |
Distill |
|
Create condensed, informative paraphrases |
Knowledge distillation |
Extract Knowledge |
|
Extract factual content as passages |
Knowledge base creation |
Knowledge List |
|
Extract structured fact lists |
Fact extraction |
Quality-Based Processing Strategy#
Nemotron-CC pipelines are designed to process data based on quality scores. The typical approach:
High-Quality Data Pipeline#
For documents with high quality scores, use tasks that leverage the existing quality:
DiverseQA: Generate Q&A pairs from well-structured content
Distill: Create condensed versions preserving key information
ExtractKnowledge: Extract factual passages
KnowledgeList: Extract structured facts
from nemo_curator.stages.text.modules.score_filter import Filter
# Filter for high-quality documents (score > 11)
pipeline.add_stage(
Filter(
filter_fn=lambda x: int(x) > 11,
filter_field="bucketed_results",
),
)
Low-Quality Data Pipeline#
For documents with lower quality scores, use Wikipedia Paraphrasing to improve text quality:
# Filter for low-quality documents (score <= 11)
pipeline.add_stage(
Filter(
filter_fn=lambda x: int(x) <= 11,
filter_field="bucketed_results",
),
)
Using Helper Functions#
The recommended approach is to use the helper functions in nemotron_cc_pipelines.py:
Note
The nemotron_cc_pipelines helper functions are provided in the tutorials directory, not as part of the installed package. Copy the nemotron_cc_pipelines.py file to your project or reference the patterns when building custom pipelines.
from nemotron_cc_pipelines import (
add_preprocessing_pipeline,
add_diverse_qa_postprocessing_pipeline,
)
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
pipeline = Pipeline(name="diverse_qa_pipeline")
# Add preprocessing
pipeline = add_preprocessing_pipeline(
pipeline=pipeline,
text_field="text",
system_prompt=SYSTEM_PROMPT,
user_prompt_template=PROMPT_TEMPLATE,
min_document_tokens=30,
min_segment_tokens=30,
max_input_tokens=1000,
args=args, # Contains tokenizer config
)
# Add generation stage
pipeline.add_stage(
DiverseQAStage(
client=llm_client,
model_name="meta/llama-3.3-70b-instruct",
generation_config=generation_config,
input_field="text",
output_field="diverse_qa",
)
)
# Add postprocessing
pipeline = add_diverse_qa_postprocessing_pipeline(
pipeline=pipeline,
llm_response_field="diverse_qa",
args=args,
)
Task Configuration#
Each task has specific token count and preprocessing requirements:
Task |
Min Doc Tokens |
Min Segment Tokens |
Max Input Tokens |
Max Output Tokens |
|---|---|---|---|---|
Diverse QA |
30 |
30 |
1000 |
600 |
Distill |
30 |
10 |
2000 |
1600 |
Extract Knowledge |
30 |
30 |
1400 |
1400 |
Knowledge List |
30 |
30 |
1000 |
600 |
Wikipedia Paraphrasing |
5 |
5 |
512 |
512 |
Quick Example#
import os
from transformers import AutoTokenizer
from nemo_curator.core.client import RayClient
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.models.client import AsyncOpenAIClient
from nemo_curator.models.client.llm_client import GenerationConfig
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
from nemo_curator.stages.text.io.reader.parquet import ParquetReader
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
# Initialize
client = RayClient(include_dashboard=False)
client.start()
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.3-70B-Instruct")
# Create LLM client
llm_client = AsyncOpenAIClient(
api_key=os.environ["NVIDIA_API_KEY"],
base_url="https://integrate.api.nvidia.com/v1",
max_concurrent_requests=5,
)
# Build pipeline (see "Using Helper Functions" section for preprocessing/postprocessing)
pipeline = Pipeline(name="nemotron_cc_diverse_qa")
pipeline.add_stage(ParquetReader(file_paths=["./input_data/*.parquet"]))
# Add preprocessing stages using helper function:
# pipeline = add_preprocessing_pipeline(pipeline, text_field="text", ...)
# Add generation stage
pipeline.add_stage(
DiverseQAStage(
client=llm_client,
model_name="meta/llama-3.3-70b-instruct",
generation_config=GenerationConfig(temperature=0.5, top_p=0.9),
input_field="text",
output_field="diverse_qa",
)
)
# Add postprocessing stages using helper function:
# pipeline = add_diverse_qa_postprocessing_pipeline(pipeline, llm_response_field="diverse_qa", ...)
pipeline.add_stage(ParquetWriter(path="./output/"))
# Execute
executor = XennaExecutor()
results = pipeline.run(executor)
client.stop()
Detailed Reference#
Detailed reference for each Nemotron-CC stage, prompts, and post-processing