Image Duplicate Removal Workflow#
Learn how to run a complete image duplicate removal workflow that generates embeddings, identifies semantic duplicates, and removes similar images from your dataset.
Before You Start#
Complete the Get Started guide.
Understand basic pipeline concepts from the Image Beginner Tutorial.
1. Generate Image Embeddings#
Create CLIP embeddings for all images in your dataset. This pipeline reads images, generates embeddings, and saves them to Parquet format for duplicate removal processing.
Define the Embedding Pipeline#
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
from nemo_curator.stages.image.io.image_reader import ImageReaderStage
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
def create_image_embedding_pipeline(input_dir, embeddings_dir, model_dir):
"""Create pipeline to generate embeddings for duplicate removal."""
pipeline = Pipeline(
name="image_embedding",
description="Generate CLIP embeddings for image duplicate removal"
)
# Partition tar files for parallel processing
pipeline.add_stage(FilePartitioningStage(
file_paths=input_dir,
files_per_partition=1,
file_extensions=[".tar"],
))
# Read images from tar archives
pipeline.add_stage(ImageReaderStage(
task_batch_size=100,
verbose=True,
num_threads=16,
num_gpus_per_worker=0.25,
))
# Generate CLIP embeddings
pipeline.add_stage(ImageEmbeddingStage(
model_dir=model_dir,
num_gpus_per_worker=0.25,
model_inference_batch_size=32,
verbose=True,
))
# Convert to document format for deduplication
pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
fields=["image_id", "embedding"]
))
# Save embeddings to Parquet
pipeline.add_stage(ParquetWriter(path=embeddings_dir))
return pipeline
Run Embedding Generation#
# Set your paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
EMBEDDINGS_DIR = "/path/to/embeddings"
MODEL_DIR = "/path/to/models"
# Create and run pipeline
embedding_pipeline = create_image_embedding_pipeline(
INPUT_TAR_DIR, EMBEDDINGS_DIR, MODEL_DIR
)
embedding_pipeline.run() # Uses XennaExecutor by default
Embedding Format Example#
The pipeline writes embeddings to Parquet with two columns:
image_id: String identifier for the image
embedding: List of float values with length 768 (CLIP ViT-L/14 dimension)
/path/to/embeddings/
part-00000-....parquet
part-00001-....parquet
part-00002-....parquet
image_id: string
embedding: list<float32> # length = 768 for CLIP ViT-L/14
{"image_id": "00001234", "embedding": [0.0123, -0.0456, 0.0031, 0.1279, ...]}
import pyarrow.parquet as pq
table = pq.read_table("/path/to/embeddings")
df = table.to_pandas()
print(df.head()) # columns: image_id, embedding (list[float])
print(f"Embedding dimension: {len(df.iloc[0]['embedding'])}")
2. Run Semantic Duplicate Removal#
Use the semantic duplicate removal workflow to identify and mark duplicate images based on embedding similarity.
from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow
def create_deduplication_workflow(embeddings_dir, removal_dir):
"""Create semantic deduplication workflow."""
return SemanticDeduplicationWorkflow(
input_path=embeddings_dir,
output_path=removal_dir,
id_field="image_id",
embedding_field="embedding",
n_clusters=100, # Number of clusters for grouping
eps=0.01, # Similarity threshold (lower = more strict)
verbose=True,
)
# Set paths
EMBEDDINGS_DIR = "/path/to/embeddings"
REMOVAL_DIR = "/path/to/removal_ids"
# Run deduplication
dedup_workflow = create_deduplication_workflow(EMBEDDINGS_DIR, REMOVAL_DIR)
dedup_workflow.run()
Parameters#
Parameter |
Type |
Default |
Description |
---|---|---|---|
|
str |
Required |
Path to directory containing embedding Parquet files |
|
str |
Required |
Path to directory for duplicate removal results |
|
str |
Required |
Column name containing image identifiers |
|
str |
Required |
Column name containing embedding vectors |
|
int |
100 |
Number of clusters for initial grouping (more clusters = faster processing but may miss some duplicates) |
|
float |
0.01 |
Similarity threshold (0–1). Lower values are more strict: |
|
bool |
True |
Enable verbose logging for debugging |
3. Remove Duplicate Images#
After identifying duplicates, use ImageDuplicatesRemovalStage
to filter them from your dataset.
Filter the original dataset to remove identified duplicates and create the final deduplicated dataset.
from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
from nemo_curator.stages.image.io.image_writer import ImageWriterStage
def create_image_removal_pipeline(input_dir, removal_dir, output_dir):
"""Create pipeline to remove duplicate images."""
pipeline = Pipeline(
name="image_deduplication",
description="Remove duplicate images from dataset"
)
# Partition input files
pipeline.add_stage(FilePartitioningStage(
file_paths=input_dir,
files_per_partition=1,
file_extensions=[".tar"],
))
# Read original images
pipeline.add_stage(ImageReaderStage(
task_batch_size=100,
verbose=True,
num_threads=16,
num_gpus_per_worker=0.25,
))
# Remove duplicates based on removal list
pipeline.add_stage(ImageDuplicatesRemovalStage(
removal_parquets_dir=removal_dir + "/duplicates",
duplicate_id_field="id",
verbose=True,
))
# Write deduplicated dataset
pipeline.add_stage(ImageWriterStage(
output_dir=output_dir,
remove_image_data=True,
verbose=True,
))
return pipeline
Parameters#
Parameter |
Type |
Default |
Description |
---|---|---|---|
|
str |
Required |
Directory containing Parquet files with image IDs to remove |
|
str |
|
Name of the column containing image IDs to remove |
|
bool |
False |
Enable verbose logging for debugging |
|
int | None |
None |
Number of workers per node for the stage (helps avoid OOM when multiple actors load the same removal Parquet files) |
Run the Removal Pipeline#
# Set paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
REMOVAL_DIR = "/path/to/removal_ids"
OUTPUT_DIR = "/path/to/deduplicated/dataset"
# Run removal pipeline
removal_pipeline = create_image_removal_pipeline(
INPUT_TAR_DIR, REMOVAL_DIR, OUTPUT_DIR
)
removal_pipeline.run() # Uses XennaExecutor by default
4. Inspect Results#
After deduplication, examine the results to understand what was removed:
Check Removal Statistics#
import pandas as pd
from glob import glob
# Read removal results
removal_files = glob(f"{REMOVAL_DIR}/duplicates/*.parquet")
removal_dfs = [pd.read_parquet(f) for f in removal_files]
all_removals = pd.concat(removal_dfs, ignore_index=True)
print(f"Total images marked for removal: {len(all_removals)}")
print(f"Unique images marked for removal: {all_removals['id'].nunique()}")
# Show sample of removed images
print("\nSample removed image IDs:")
print(all_removals['id'].head(10).tolist())
Compare Dataset Sizes#
import os
def count_tar_files(directory):
"""Count tar files in a directory."""
tar_files = glob(os.path.join(directory, "*.tar"))
return len(tar_files)
original_count = count_tar_files(INPUT_WDS_DIR)
deduplicated_count = count_tar_files(OUTPUT_DIR)
print(f"Original dataset: {original_count} tar files")
print(f"Deduplicated dataset: {deduplicated_count} tar files")
print(f"Reduction: {original_count - deduplicated_count} files ({(1 - deduplicated_count/original_count)*100:.1f}%)")
5. Complete Workflow Script#
Here’s the complete workflow that combines all steps:
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
from nemo_curator.stages.image.io.image_reader import ImageReaderStage
from nemo_curator.stages.image.io.image_writer import ImageWriterStage
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
def run_image_deduplication_workflow():
"""Run complete image deduplication workflow."""
# Define paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
EMBEDDINGS_DIR = "/path/to/embeddings"
REMOVAL_DIR = "/path/to/removal_ids"
OUTPUT_DIR = "/path/to/deduplicated/dataset"
MODEL_DIR = "/path/to/models"
print("Step 1: Generating embeddings...")
# Step 1: Generate embeddings
embedding_pipeline = Pipeline(name="embedding", description="Generate embeddings")
embedding_pipeline.add_stage(FilePartitioningStage(
file_paths=INPUT_WDS_DIR, files_per_partition=1, file_extensions=[".tar"]
))
embedding_pipeline.add_stage(ImageReaderStage(
task_batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
))
embedding_pipeline.add_stage(ImageEmbeddingStage(
model_dir=MODEL_DIR, num_gpus_per_worker=0.25,
model_inference_batch_size=32, verbose=True
))
embedding_pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
fields=["image_id", "embedding"]
))
embedding_pipeline.add_stage(ParquetWriter(path=EMBEDDINGS_DIR))
embedding_pipeline.run() # Uses XennaExecutor by default
print("Step 2: Running semantic deduplication...")
# Step 2: Semantic deduplication
dedup_workflow = SemanticDeduplicationWorkflow(
input_path=EMBEDDINGS_DIR,
output_path=REMOVAL_DIR,
id_field="image_id",
embedding_field="embedding",
n_clusters=100,
eps=0.01,
verbose=True,
)
dedup_workflow.run()
print("Step 3: Removing duplicate images...")
# Step 3: Remove duplicates
removal_pipeline = Pipeline(name="removal", description="Remove duplicates")
removal_pipeline.add_stage(FilePartitioningStage(
file_paths=INPUT_WDS_DIR, files_per_partition=1, file_extensions=[".tar"]
))
removal_pipeline.add_stage(ImageReaderStage(
task_batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
))
removal_pipeline.add_stage(ImageDuplicatesRemovalStage(
removal_parquets_dir=REMOVAL_DIR + "/duplicates",
duplicate_id_field="id",
verbose=True,
))
removal_pipeline.add_stage(ImageWriterStage(
output_dir=OUTPUT_DIR, remove_image_data=True, verbose=True
))
removal_pipeline.run() # Uses XennaExecutor by default
print(f"Deduplication complete! Results saved to: {OUTPUT_DIR}")
if __name__ == "__main__":
run_image_deduplication_workflow()
Next Steps#
After running image deduplication:
Quality assessment: Manually review a sample of removed duplicates
Combine with filtering: Run aesthetic/NSFW filtering on deduplicated data
Export for training: Prepare final curated dataset for ML training
Monitor metrics: Track deduplication rates across different image types