Text Processing Concepts#

This guide covers the most common text processing workflows in NVIDIA NeMo Curator, based on real-world usage patterns from production data curation pipelines.

Most Common Workflows#

The majority of NeMo Curator users follow these core workflows, typically in this order:

1. Quality Filtering#

Most users start with basic quality filtering using heuristic filters to remove low-quality content:

Essential Quality Filters:

  • WordCountFilter - Remove too short/long documents

  • NonAlphaNumericFilter - Remove symbol-heavy content

  • RepeatedLinesFilter - Remove repetitive content

  • PunctuationFilter - Ensure proper sentence structure

  • BoilerPlateStringFilter - Remove template/boilerplate text

2. Content Cleaning and Modification#

Basic text normalization and cleaning operations:

Common Cleaning Steps:

  • UnicodeReformatter - Normalize Unicode characters

  • NewlineNormalizer - Standardize line breaks

  • Basic HTML/markup removal

3. Deduplication#

Remove duplicate and near-duplicate content. For comprehensive coverage of all deduplication approaches, refer to Deduplication Concepts.

Fuzzy Deduplication#

For production datasets, fuzzy deduplication is essential to remove near-duplicate content across sources:

Key Components:

  • FuzzyDeduplicationWorkflow - End-to-end fuzzy deduplication pipeline

  • Ray distributed computing framework for scalability

  • Connected components clustering for duplicate identification

Exact Deduplication#

Remove identical documents, especially useful for smaller datasets:

Implementation:

  • ExactDuplicates - Hash-based exact matching

  • MD5 or SHA-256 hashing for document identification

Semantic Deduplication#

Remove semantically similar content using embeddings for more sophisticated duplicate detection.

Core Processing Architecture#

NeMo Curator uses these fundamental building blocks that users combine into pipelines:

Component

Purpose

Usage Pattern

Pipeline

Orchestrate processing stages

Every workflow starts here

ScoreFilter

Apply filters with optional scoring

Chain multiple quality filters

Modify

Transform document content

Clean and normalize text

Reader/Writer Stages

Load and save text data

Input/output for pipelines

Processing Stages

Transform DocumentBatch tasks

Core processing components

Implementation Examples#

Complete Quality Filtering Pipeline#

This is the most common starting workflow, used in 90% of production pipelines:

Quality Filtering Pipeline Code Example
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import JsonlWriter
from nemo_curator.stages.text.modules import ScoreFilter
from nemo_curator.stages.text.filters import (
    WordCountFilter,
    NonAlphaNumericFilter,
    RepeatedLinesFilter,
    PunctuationFilter,
    BoilerPlateStringFilter
)

# Create processing pipeline
pipeline = Pipeline(name="quality_filtering")

# Load dataset - the starting point for all workflows
reader = JsonlReader(file_paths="data/*.jsonl")
pipeline.add_stage(reader)

# Standard quality filtering pipeline (most common)
# Remove too short/long documents (essential)
word_count_filter = ScoreFilter(
    score_fn=WordCountFilter(min_words=50, max_words=100000),
    text_field="text",
    score_field="word_count"
)
pipeline.add_stage(word_count_filter)

# Remove symbol-heavy content
alpha_numeric_filter = ScoreFilter(
    score_fn=NonAlphaNumericFilter(max_non_alpha_numeric_to_text_ratio=0.25),
    text_field="text"
)
pipeline.add_stage(alpha_numeric_filter)

# Remove repetitive content
repeated_lines_filter = ScoreFilter(
    score_fn=RepeatedLinesFilter(max_repeated_line_fraction=0.7),
    text_field="text"
)
pipeline.add_stage(repeated_lines_filter)

# Ensure proper sentence structure
punctuation_filter = ScoreFilter(
    score_fn=PunctuationFilter(max_num_sentences_without_endmark_ratio=0.85),
    text_field="text"
)
pipeline.add_stage(punctuation_filter)

# Remove template/boilerplate text
boilerplate_filter = ScoreFilter(
    score_fn=BoilerPlateStringFilter(),
    text_field="text"
)
pipeline.add_stage(boilerplate_filter)

# Add writer stage
writer = JsonlWriter(path="filtered_data/")
pipeline.add_stage(writer)

# Execute pipeline
results = pipeline.run()

Content Cleaning Pipeline#

Basic text normalization:

Content Cleaning Pipeline Code Example
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import JsonlWriter
from nemo_curator.stages.text.modules import Modify
from nemo_curator.stages.text.modifiers import UnicodeReformatter

# Create cleaning pipeline
pipeline = Pipeline(name="content_cleaning")

# Read input data
reader = JsonlReader(file_paths="input_data/*.jsonl")
pipeline.add_stage(reader)

# Essential cleaning steps
# Normalize unicode characters (very common)
unicode_modifier = Modify(
    modifier=UnicodeReformatter(),
    text_field="text"
)
pipeline.add_stage(unicode_modifier)

# Additional processing steps can be added as needed

# Write cleaned data
writer = JsonlWriter(path="cleaned_data/")
pipeline.add_stage(writer)

# Execute pipeline
results = pipeline.run()

Large-Scale Fuzzy Deduplication#

Critical for production datasets (requires Ray + GPU):

Fuzzy Deduplication Code Example
from nemo_curator.core.client import RayClient
from nemo_curator.stages.deduplication.fuzzy.workflow import FuzzyDeduplicationWorkflow

# Initialize Ray cluster with GPU support (required for fuzzy deduplication)
ray_client = RayClient(num_gpus=4)
ray_client.start()

# Configure fuzzy deduplication workflow (production settings)
fuzzy_workflow = FuzzyDeduplicationWorkflow(
    input_path="/path/to/input/data",
    cache_path="./cache",
    output_path="./output",
    text_field="text",
    perform_removal=False,  # Currently only identification supported
    # LSH parameters for ~80% similarity threshold
    num_bands=20,           # Number of LSH bands
    minhashes_per_band=13,  # Hashes per band
    char_ngrams=24,         # Character n-gram size
    seed=42
)

# Run fuzzy deduplication workflow
fuzzy_workflow.run()

# Cleanup Ray when done
ray_client.stop()

Exact Deduplication (All dataset sizes)#

Quick deduplication for any dataset size (requires Ray + GPU):

Exact Deduplication Code Example
from nemo_curator.core.client import RayClient
from nemo_curator.stages.deduplication.exact.workflow import ExactDeduplicationWorkflow

# Initialize Ray cluster with GPU support (required for exact deduplication)
ray_client = RayClient(num_gpus=4)
ray_client.start()

# Configure exact deduplication workflow
exact_workflow = ExactDeduplicationWorkflow(
    input_path="/path/to/input/data",
    output_path="/path/to/output",
    text_field="text",
    perform_removal=False,  # Currently only identification supported
    assign_id=True,         # Automatically assign unique IDs
    input_filetype="parquet"
)

# Run exact deduplication workflow
exact_workflow.run()

# Cleanup Ray when done
ray_client.stop()

Complete End-to-End Pipeline#

Most users combine these steps into a comprehensive workflow:

Complete End-to-End Pipeline Code Example
from nemo_curator.pipeline import Pipeline

from nemo_curator.core.client import RayClient

# Complete production pipeline (most common pattern)
def build_production_pipeline():
    pipeline = Pipeline(name="production_processing")
    
    # 1. Content cleaning first
    unicode_modifier = Modify(
        modifier=UnicodeReformatter(),
        text_field="text"
    )
    pipeline.add_stage(unicode_modifier)
    
    # Note: PII processing requires specialized tools - see PII documentation
    # for proper implementation using dedicated PII processing pipelines
    
    # 2. Quality filtering
    word_filter = ScoreFilter(
        score_fn=WordCountFilter(min_words=50, max_words=100000),
        text_field="text"
    )
    pipeline.add_stage(word_filter)
    
    alpha_filter = ScoreFilter(
        score_fn=NonAlphaNumericFilter(max_non_alpha_numeric_to_text_ratio=0.25),
        text_field="text"
    )
    pipeline.add_stage(alpha_filter)
    
    repeated_filter = ScoreFilter(
        score_fn=RepeatedLinesFilter(max_repeated_line_fraction=0.7),
        text_field="text"
    )
    pipeline.add_stage(repeated_filter)
    
    boilerplate_filter = ScoreFilter(
        score_fn=BoilerPlateStringFilter(),
        text_field="text"
    )
    pipeline.add_stage(boilerplate_filter)
    
    return pipeline

# Apply the complete pipeline
complete_pipeline = build_production_pipeline()
processed_results = complete_pipeline.run()

# Then apply deduplication separately for large datasets
# For large datasets - use fuzzy deduplication
ray_client = RayClient(num_gpus=4)
ray_client.start()
fuzzy_workflow = FuzzyDeduplicationWorkflow(
    input_path="/path/to/processed/data",
    cache_path="./cache",
    output_path="./output",
    text_field="text"
)
fuzzy_workflow.run()

# For smaller datasets - use exact deduplication
exact_workflow = ExactDeduplicationWorkflow(
    input_path="/path/to/processed/data",
    output_path="./output",
    text_field="text",
    assign_id=True
)
exact_workflow.run()

ray_client.stop()