Curate TextSynthetic DataNemotron-CC

Nemotron-CC Pipelines

View as Markdown

Nemotron-CC provides advanced synthetic data generation workflows for transforming and extracting knowledge from existing text documents. Unlike simple generation, these pipelines use sophisticated preprocessing, LLM-based transformation, and postprocessing to create high-quality training data.

The Composable Pipeline Pattern

Nemotron-CC stages follow a composable pattern with three distinct phases:

  1. Preprocessing: Segment documents, filter by length, and prepare inputs for the LLM
  2. Generation: Apply task-specific prompts to transform text using the LLM
  3. Postprocessing: Clean outputs, remove formatting artifacts, and filter low-quality results

This separation enables fine-grained control over each phase while providing reusable helper functions for common patterns.

Pipeline Architecture

Input Data Requirements

Before running a Nemotron-CC pipeline, prepare your input data as Parquet files with the required schema.

Required Schema

ColumnTypeDescription
idint64Unique document identifier. Required by the preprocessing pipeline to reassemble document segments after splitting.
textstringDocument content to transform. This is the primary input field for all Nemotron-CC stages.
bucketed_resultsint64Quality score used to route documents to appropriate pipelines. Values typically range from 0-20, where higher scores indicate higher quality content.

Quality Score Field

The bucketed_results field contains quality scores that determine which pipeline processes each document:

  • High-quality documents (bucketed_results >11): Process with DiverseQA, Distill, ExtractKnowledge, or KnowledgeList tasks
  • Low-quality documents (bucketed_results <= 11): Process with WikipediaParaphrasing to improve text quality

Generating Quality Scores

Use NeMo Curator’s quality assessment tools to generate quality scores before running SDG pipelines:

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.text.io.reader import JsonlReader
3from nemo_curator.stages.text.io.writer import ParquetWriter
4from nemo_curator.stages.text.classifiers import FineWebEduClassifier
5from nemo_curator.stages.text.modules import AddId
6
7# Create pipeline to score documents
8pipeline = Pipeline(name="quality_scoring")
9
10# Read raw documents
11pipeline.add_stage(JsonlReader(file_paths="raw_data/*.jsonl", fields=["text"]))
12
13# Add unique document IDs
14pipeline.add_stage(AddId(id_field="id"))
15
16# Score document quality (outputs int score 0-5)
17pipeline.add_stage(
18 FineWebEduClassifier(
19 int_score_field="bucketed_results", # Use this as quality score
20 )
21)
22
23# Save as Parquet for SDG pipeline
24pipeline.add_stage(ParquetWriter(path="scored_data/"))
25
26results = pipeline.run()

The example above uses FineWebEduClassifier which outputs scores 0-5. For the Nemotron-CC threshold of 11, you can either:

  • Scale the scores (e.g., multiply by 4)
  • Adjust the filter threshold in your SDG pipeline
  • Use a different classifier that outputs scores in the 0-20 range

For detailed information on quality scoring options, see Quality Assessment & Filtering .

Example Data

An example Parquet file with the correct schema is available in the tutorials directory:

$tutorials/synthetic/nemotron_cc/example_data/data.parquet

You can inspect its structure:

1import pandas as pd
2
3df = pd.read_parquet("tutorials/synthetic/nemotron_cc/example_data/data.parquet")
4print(df.columns.tolist()) # ['id', 'text', 'bucketed_results']
5print(df.head(2))

Available Tasks

Nemotron-CC provides five specialized generation tasks, each designed for specific data transformation needs:

TaskStage ClassPurposeUse Case
Wikipedia ParaphrasingWikipediaParaphrasingStageRewrite text as Wikipedia-style proseImproving noisy web data
Diverse QADiverseQAStageGenerate diverse Q&A pairsReading comprehension training
DistillDistillStageCreate condensed, informative paraphrasesKnowledge distillation
Extract KnowledgeExtractKnowledgeStageExtract factual content as passagesKnowledge base creation
Knowledge ListKnowledgeListStageExtract structured fact listsFact extraction

Quality-Based Processing Strategy

Nemotron-CC pipelines are designed to process data based on quality scores. The typical approach:

High-Quality Data Pipeline

For documents with high quality scores, use tasks that leverage the existing quality:

  • DiverseQA: Generate Q&A pairs from well-structured content
  • Distill: Create condensed versions preserving key information
  • ExtractKnowledge: Extract factual passages
  • KnowledgeList: Extract structured facts
1from nemo_curator.stages.text.modules.score_filter import Filter
2
3# Filter for high-quality documents (score >11)
4pipeline.add_stage(
5 Filter(
6 filter_fn=lambda x: int(x) >11,
7 filter_field="bucketed_results",
8 ),
9)

Low-Quality Data Pipeline

For documents with lower quality scores, use Wikipedia Paraphrasing to improve text quality:

1# Filter for low-quality documents (score <= 11)
2pipeline.add_stage(
3 Filter(
4 filter_fn=lambda x: int(x) <= 11,
5 filter_field="bucketed_results",
6 ),
7)

Using Helper Functions

The recommended approach is to use the helper functions in nemotron_cc_pipelines.py:

The nemotron_cc_pipelines helper functions are provided in the tutorials directory, not as part of the installed package. Copy the nemotron_cc_pipelines.py file to your project or reference the patterns when building custom pipelines.

1from nemotron_cc_pipelines import (
2 add_preprocessing_pipeline,
3 add_diverse_qa_postprocessing_pipeline,
4)
5from nemo_curator.pipeline import Pipeline
6from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
7
8pipeline = Pipeline(name="diverse_qa_pipeline")
9
10# Add preprocessing
11pipeline = add_preprocessing_pipeline(
12 pipeline=pipeline,
13 text_field="text",
14 system_prompt=SYSTEM_PROMPT,
15 user_prompt_template=PROMPT_TEMPLATE,
16 min_document_tokens=30,
17 min_segment_tokens=30,
18 max_input_tokens=1000,
19 args=args, # Contains tokenizer config
20)
21
22# Add generation stage
23pipeline.add_stage(
24 DiverseQAStage(
25 client=llm_client,
26 model_name="meta/llama-3.3-70b-instruct",
27 generation_config=generation_config,
28 input_field="text",
29 output_field="diverse_qa",
30 )
31)
32
33# Add postprocessing
34pipeline = add_diverse_qa_postprocessing_pipeline(
35 pipeline=pipeline,
36 llm_response_field="diverse_qa",
37 args=args,
38)

Task Configuration

Each task has specific token count and preprocessing requirements:

TaskMin Doc TokensMin Segment TokensMax Input TokensMax Output Tokens
Diverse QA30301000600
Distill301020001600
Extract Knowledge303014001400
Knowledge List30301000600
Wikipedia Paraphrasing55512512

Quick Example

1import os
2from transformers import AutoTokenizer
3from nemo_curator.core.client import RayClient
4from nemo_curator.backends.xenna import XennaExecutor
5from nemo_curator.models.client import AsyncOpenAIClient
6from nemo_curator.models.client.llm_client import GenerationConfig
7from nemo_curator.pipeline import Pipeline
8from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
9from nemo_curator.stages.text.io.reader.parquet import ParquetReader
10from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
11
12# Initialize
13client = RayClient(include_dashboard=False)
14client.start()
15tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.3-70B-Instruct")
16
17# Create LLM client
18llm_client = AsyncOpenAIClient(
19 api_key=os.environ["NVIDIA_API_KEY"],
20 base_url="https://integrate.api.nvidia.com/v1",
21 max_concurrent_requests=5,
22)
23
24# Build pipeline (see "Using Helper Functions" section for preprocessing/postprocessing)
25pipeline = Pipeline(name="nemotron_cc_diverse_qa")
26pipeline.add_stage(ParquetReader(file_paths=["./input_data/*.parquet"]))
27
28# Add preprocessing stages using helper function:
29# pipeline = add_preprocessing_pipeline(pipeline, text_field="text", ...)
30
31# Add generation stage
32pipeline.add_stage(
33 DiverseQAStage(
34 client=llm_client,
35 model_name="meta/llama-3.3-70b-instruct",
36 generation_config=GenerationConfig(temperature=0.5, top_p=0.9),
37 input_field="text",
38 output_field="diverse_qa",
39 )
40)
41
42# Add postprocessing stages using helper function:
43# pipeline = add_diverse_qa_postprocessing_pipeline(pipeline, llm_response_field="diverse_qa", ...)
44
45pipeline.add_stage(ParquetWriter(path="./output/"))
46
47# Execute
48executor = XennaExecutor()
49results = pipeline.run(executor)
50
51client.stop()

Detailed Reference