Exact Duplicate Removal#
Remove character-for-character duplicate documents using NeMo Curator’s exact duplicate removal workflow. This method computes MD5 hashes for each document’s text and identifies documents with identical hashes as duplicates.
For an overview of all duplicate removal options, refer to Deduplication.
How It Works#
Exact deduplication uses MD5 hashing to identify identical documents:
Computes MD5 hash for each document’s text content
Groups documents by identical hash values
Identifies duplicates and saves IDs for removal
This method targets character-for-character duplicates and is recommended for removing exact copies of documents.
Before You Start#
Prerequisites:
Ray cluster with GPU support (required for distributed processing)
Stable document identifiers for removal (either existing IDs or IDs assigned by the workflow)
Adding Document IDs
If your broader pipeline does not already manage IDs, you can add them with the AddId stage:
from nemo_curator.stages.text.modules import AddId
from nemo_curator.pipeline import Pipeline
pipeline = Pipeline(name="add_ids_for_dedup")
pipeline.add_stage(
AddId(
id_field="doc_id",
id_prefix="corpus" # Optional prefix
)
)
For more details, refer to Adding Document IDs.
Quick Start#
Get started with exact deduplication using these examples:
Identify duplicates, then remove them:
from nemo_curator.stages.deduplication.exact.workflow import ExactDeduplicationWorkflow
from nemo_curator.stages.text.deduplication.removal_workflow import TextDuplicatesRemovalWorkflow
# Step 1: Identify duplicates
exact_workflow = ExactDeduplicationWorkflow(
input_path="input_data/",
output_path="./results",
text_field="text",
assign_id=True,
perform_removal=False,
input_filetype="parquet"
)
exact_workflow.run()
# Duplicate IDs saved to ./results/ExactDuplicateIds/
# Step 2: Remove duplicates
removal_workflow = TextDuplicatesRemovalWorkflow(
input_path="input_data/",
ids_to_remove_path="./results/ExactDuplicateIds",
output_path="./deduplicated",
input_filetype="parquet",
input_id_field="_curator_dedup_id",
ids_to_remove_duplicate_id_field="_curator_dedup_id",
id_generator_path="./results/exact_id_generator.json"
)
removal_workflow.run()
# Clean dataset saved to ./deduplicated/
from nemo_curator.stages.deduplication.exact.workflow import ExactDeduplicationWorkflow
exact_workflow = ExactDeduplicationWorkflow(
input_path="input_data/",
output_path="./results",
text_field="text",
assign_id=True,
perform_removal=False
)
exact_workflow.run()
Configuration#
Configure exact deduplication using these key parameters:
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
str | list[str] |
None |
Path(s) to input files or directories |
|
str |
Required |
Directory to write duplicate IDs and ID generator |
|
str |
“text” |
Name of the text field in input data |
|
bool |
True |
Whether to automatically assign unique IDs |
|
str | None |
None |
Existing ID field name (if assign_id=False) |
|
str |
“parquet” |
Input file format (“parquet” or “jsonl”) |
|
str | int |
“2GiB” |
Size of input blocks for processing |
|
bool |
False |
Reserved; must remain |
Advanced Configuration
Cloud Storage:
workflow = ExactDeduplicationWorkflow(
input_path="s3://bucket/input/",
output_path="s3://bucket/output/",
read_kwargs={
"storage_options": {"key": "<access_key>", "secret": "<secret_key>"}
},
write_kwargs={
"storage_options": {"key": "<access_key>", "secret": "<secret_key>"}
},
# ... other parameters
)
Passing Environment Variables:
You can pass environment variables to the Ray executor by using the env_vars parameter on ExactDeduplicationWorkflow. For example:
env_vars = {
"UCX_TLS": "rc,cuda_copy,cuda_ipc",
"UCX_IB_GPU_DIRECT_RDMA": "yes",
}
Removing Duplicates#
After identifying duplicates, use TextDuplicatesRemovalWorkflow to remove them:
from nemo_curator.stages.text.deduplication.removal_workflow import TextDuplicatesRemovalWorkflow
removal_workflow = TextDuplicatesRemovalWorkflow(
input_path="/path/to/input/data",
ids_to_remove_path="/path/to/output/ExactDuplicateIds",
output_path="/path/to/deduplicated",
input_filetype="parquet",
input_id_field="_curator_dedup_id",
ids_to_remove_duplicate_id_field="_curator_dedup_id",
id_generator_path="/path/to/output/exact_id_generator.json" # Required if assign_id=True
)
removal_workflow.run()
ID Field Configuration
When assign_id=True:
Duplicate IDs file contains
_curator_dedup_idcolumnSet
ids_to_remove_duplicate_id_field="_curator_dedup_id"id_generator_pathis required
When assign_id=False:
Duplicate IDs file contains the column specified by
id_field(e.g.,"id")Set
ids_to_remove_duplicate_id_fieldto match yourid_fieldvalueid_generator_pathnot required
Output Format#
The exact deduplication process produces the following directory structure:
output_path/
├── ExactDuplicateIds/ # Duplicate identification results
│ └── *.parquet # Parquet files with document IDs to remove
└── exact_id_generator.json # ID generator mapping (if assign_id=True)
File Formats#
The workflow produces these output files:
Duplicate IDs (
ExactDuplicateIds/*.parquet):Contains document IDs to remove
Format: Parquet files with a single ID column
Column name depends on
assign_id:When
assign_id=True: Column is"_curator_dedup_id"When
assign_id=False: Column matches theid_fieldparameter (e.g.,"id")
Important: Contains only the IDs of documents to remove, not the full document content
ID Generator (
exact_id_generator.json):JSON file containing ID generator state
Required for removal workflow when
assign_id=TrueEnsures consistent ID mapping across workflow stages
Performance Considerations
Performance characteristics:
Uses MD5 hashing over the configured text field to derive duplicate groups
Runs as a Ray-based workflow and writes duplicate IDs to the
ExactDuplicateIds/directoryStores only document IDs to remove in the output files, not full document content
Best practices:
Use
input_blocksize="2GiB"for optimal performanceClear output directory between runs
Use
assign_id=Truefor consistent ID tracking
Advanced Usage
Integration with existing pipelines:
from nemo_curator.tasks import FileGroupTask
from nemo_curator.stages.deduplication.exact.workflow import ExactDeduplicationWorkflow
initial_tasks = [
FileGroupTask(
task_id="batch_0",
dataset_name="my_dataset",
data=["/path/to/file1.parquet", "/path/to/file2.parquet"],
_metadata={"source_files": ["/path/to/file1.parquet", "/path/to/file2.parquet"]},
)
]
exact_workflow = ExactDeduplicationWorkflow(
output_path="/path/to/output",
text_field="text",
assign_id=True
)
exact_workflow.run(initial_tasks=initial_tasks)
For comparison with other deduplication methods and guidance on when to use exact deduplication, refer to the Deduplication overview.