***

description: >-
Score and remove low-quality content using heuristics and ML classifiers with
comprehensive filtering capabilities
categories:

* workflows
  tags:
* quality-assessment
* filtering
* heuristic
* classifier
* distributed
* scoring
  personas:
* data-scientist-focused
* mle-focused
  difficulty: intermediate
  content\_type: workflow
  modality: text-only

***

# Quality Assessment & Filtering

Score and remove low-quality content using heuristics and ML classifiers to prepare your data for model training using NeMo Curator's tools and utilities.

Large datasets often contain many documents considered "low quality." In this context, "low quality" means data we do not want downstream models to learn from, and "high quality" is data we do want them to learn from. The metrics that define quality can vary widely.

## How It Works

NeMo Curator's filtering framework is built around several key components that work within the [data processing architecture ](/about/concepts/text/data/processing):

<Tabs>
  <Tab title="ScoreFilter">
    The `ScoreFilter` is at the center of filtering in NeMo Curator. It applies a filter to a document and optionally saves the score as metadata:

    ```python
    from nemo_curator.pipeline import Pipeline
    from nemo_curator.stages.text.io.reader import JsonlReader
    from nemo_curator.stages.text.io.writer import JsonlWriter
    from nemo_curator.stages.text.modules import ScoreFilter
    from nemo_curator.stages.text.filters import WordCountFilter

    # Create pipeline
    pipeline = Pipeline(name="quality_filtering")

    # Load dataset
    reader = JsonlReader(
        file_paths="books_dataset/*.jsonl",
        fields=["text", "id"]
    )
    pipeline.add_stage(reader)

    # Create and apply filter
    filter_stage = ScoreFilter(
        filter_obj=WordCountFilter(min_words=80),
        text_field="text",
        score_field="word_count",
    )
    pipeline.add_stage(filter_stage)

    # Save filtered dataset
    writer = JsonlWriter(path="long_books/")
    pipeline.add_stage(writer)

    # Execute pipeline (uses XennaExecutor by default)
    results = pipeline.run()
    ```

    <Note>
      **Default Executor**: When you call `pipeline.run()` without specifying an executor, NeMo Curator automatically uses `XennaExecutor()` as the default. You can optionally specify a different executor by passing it as a parameter: `pipeline.run(executor=my_executor)`.
    </Note>

    The filter object implements two key methods:

    * `score_document`: Computes a quality score for a document
    * `keep_document`: Determines if a document should be kept based on its score
  </Tab>

  <Tab title="Filter and Score Modules">
    For more specific use cases, NeMo Curator provides two specialized modules:

    * `Score`: A module that only adds metadata scores to records without filtering
      * Takes a scoring function that evaluates text and returns a score
      * Adds the score to a specified metadata field
      * Useful for analysis or multi-stage filtering pipelines

    ```python
    # Example: Score documents without filtering
    from nemo_curator.stages.text.modules import Score

    scoring_step = Score(
        WordCountFilter().score_document,  # Use just the scoring part
        text_field="text",
        score_field="word_count"
    )
    scored_dataset = scoring_step.process(dataset)
    ```

    * `Filter`: A module that filters based on pre-computed metadata
      * Takes a filter function that evaluates metadata and returns True/False
      * Only uses existing metadata fields (doesn't compute new scores)
      * Efficient for filtering on pre-computed metrics

    ```python
    # Example: Filter using pre-computed scores
    from nemo_curator.stages.text.modules import Filter

    filter_step = Filter(
        lambda score: score &gt;= 100,  # Keep documents with score &gt;= 100
        filter_field="word_count"
    )
    filtered_dataset = filter_step.process(scored_dataset)
    ```

    You can combine these modules in pipelines:

    ```python
    from nemo_curator.pipeline import Pipeline
    from nemo_curator.stages.text.modules import Score, Filter
    # Assume `word_counter` and `symbol_counter` are callables that return numeric scores
    pipeline = Pipeline(name="multi_stage_filtering")
    pipeline.add_stage(Score(word_counter, score_field="word_count"))
    pipeline.add_stage(Score(symbol_counter, score_field="symbol_ratio"))
    pipeline.add_stage(Filter(lambda x: x &gt;= 100, filter_field="word_count"))
    pipeline.add_stage(Filter(lambda x: x &lt;= 0.3, filter_field="symbol_ratio"))
    ```
  </Tab>
</Tabs>

***

## Filtering Approaches

<Cards>
  <Card title="Heuristic Filtering" href="/curate-text/process-data/quality-assessment/heuristic">
    Filter text using configurable rules and metrics
    rules
    metrics
    fast
  </Card>

  <Card title="Classifier Filtering" href="/curate-text/process-data/quality-assessment/classifier">
    Filter text using trained quality classifiers
    ml-models
    quality
    scoring
  </Card>

  <Card title="Distributed Classification" href="/curate-text/process-data/quality-assessment/distributed-classifier">
    GPU-accelerated classification with pre-trained models
    gpu
    distributed
    scalable
  </Card>
</Cards>

## Usage

NeMo Curator provides programmatic interfaces for document filtering through the Pipeline framework:

```python
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import JsonlWriter
from nemo_curator.stages.text.modules import ScoreFilter
from nemo_curator.stages.text.filters import WordCountFilter

# Create and configure pipeline
pipeline = Pipeline(name="document_filtering")

# Add data loading
reader = JsonlReader(
    file_paths="/path/to/input/data/*.jsonl",
    fields=["text", "id"]
)
pipeline.add_stage(reader)

# Add filtering stage
filter_stage = ScoreFilter(
    filter_obj=WordCountFilter(min_words=80),
    text_field="text",
    score_field="word_count"
)
pipeline.add_stage(filter_stage)

# Add output stage
writer = JsonlWriter(path="/path/to/output/filtered/")
pipeline.add_stage(writer)

# Execute pipeline (uses XennaExecutor by default)
results = pipeline.run()
```

## Best Practices

When filtering large datasets, consider these performance tips:

1. **Order matters**: Place computationally inexpensive filters early in your pipeline
2. **Batch size tuning**: Adjust batch sizes based on your hardware capabilities
3. **Use vectorization**: Implement batched methods for compute-intensive filters
4. **Disk I/O**: Consider compression and chunking strategies for large datasets
5. **Distributed processing**: For TB-scale datasets, use distributed filtering with the XennaExecutor
