Image Duplicate Removal Workflow#

Learn how to run a complete image duplicate removal workflow that generates embeddings, identifies semantic duplicates, and removes similar images from your dataset.

Before You Start#


1. Generate Image Embeddings#

Create CLIP embeddings for all images in your dataset. This pipeline reads images, generates embeddings, and saves them to Parquet format for duplicate removal processing.

Define the Embedding Pipeline#

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
from nemo_curator.stages.image.io.image_reader import ImageReaderStage
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter

def create_image_embedding_pipeline(input_dir, embeddings_dir, model_dir):
    """Create pipeline to generate embeddings for duplicate removal."""
    
    pipeline = Pipeline(
        name="image_embedding", 
        description="Generate CLIP embeddings for image duplicate removal"
    )
    
    # Partition tar files for parallel processing
    pipeline.add_stage(FilePartitioningStage(
        file_paths=input_dir,
        files_per_partition=1,
        file_extensions=[".tar"],
    ))
    
    # Read images from tar archives
    pipeline.add_stage(ImageReaderStage(
        task_batch_size=100,
        verbose=True,
        num_threads=16,
        num_gpus_per_worker=0.25,
    ))
    
    # Generate CLIP embeddings
    pipeline.add_stage(ImageEmbeddingStage(
        model_dir=model_dir,
        num_gpus_per_worker=0.25,
        model_inference_batch_size=32,
        verbose=True,
    ))
    
    # Convert to document format for deduplication
    pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
        fields=["image_id", "embedding"]
    ))
    
    # Save embeddings to Parquet
    pipeline.add_stage(ParquetWriter(path=embeddings_dir))
    
    return pipeline

Run Embedding Generation#

# Set your paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
EMBEDDINGS_DIR = "/path/to/embeddings"
MODEL_DIR = "/path/to/models"

# Create and run pipeline
embedding_pipeline = create_image_embedding_pipeline(
    INPUT_TAR_DIR, EMBEDDINGS_DIR, MODEL_DIR
)
embedding_pipeline.run()  # Uses XennaExecutor by default

Embedding Format Example#

The pipeline writes embeddings to Parquet with two columns:

  • image_id: String identifier for the image

  • embedding: List of float values with length 768 (CLIP ViT-L/14 dimension)

/path/to/embeddings/
  part-00000-....parquet
  part-00001-....parquet
  part-00002-....parquet
image_id: string
embedding: list<float32>  # length = 768 for CLIP ViT-L/14
{"image_id": "00001234", "embedding": [0.0123, -0.0456, 0.0031, 0.1279, ...]}
import pyarrow.parquet as pq

table = pq.read_table("/path/to/embeddings")
df = table.to_pandas()
print(df.head())  # columns: image_id, embedding (list[float])
print(f"Embedding dimension: {len(df.iloc[0]['embedding'])}")

2. Run Semantic Duplicate Removal#

Use the semantic duplicate removal workflow to identify and mark duplicate images based on embedding similarity.

from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow

def create_deduplication_workflow(embeddings_dir, removal_dir):
    """Create semantic deduplication workflow."""
    
    return SemanticDeduplicationWorkflow(
        input_path=embeddings_dir,
        output_path=removal_dir,
        id_field="image_id",
        embedding_field="embedding",
        n_clusters=100,          # Number of clusters for grouping
        eps=0.01,               # Similarity threshold (lower = more strict)
        verbose=True,
    )

# Set paths
EMBEDDINGS_DIR = "/path/to/embeddings"
REMOVAL_DIR = "/path/to/removal_ids"

# Run deduplication
dedup_workflow = create_deduplication_workflow(EMBEDDINGS_DIR, REMOVAL_DIR)
dedup_workflow.run()

Parameters#

Parameter

Type

Default

Description

input_path

str

Required

Path to directory containing embedding Parquet files

output_path

str

Required

Path to directory for duplicate removal results

id_field

str

Required

Column name containing image identifiers

embedding_field

str

Required

Column name containing embedding vectors

n_clusters

int

100

Number of clusters for initial grouping (more clusters = faster processing but may miss some duplicates)

eps

float

0.01

Similarity threshold (0–1). Lower values are more strict: 0.01 = very strict (near-identical), 0.05 = moderate (visually similar), 0.1 = loose (semantically related)

verbose

bool

True

Enable verbose logging for debugging


3. Remove Duplicate Images#

After identifying duplicates, use ImageDuplicatesRemovalStage to filter them from your dataset.

Filter the original dataset to remove identified duplicates and create the final deduplicated dataset.

from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
from nemo_curator.stages.image.io.image_writer import ImageWriterStage

def create_image_removal_pipeline(input_dir, removal_dir, output_dir):
    """Create pipeline to remove duplicate images."""
    
    pipeline = Pipeline(
        name="image_deduplication",
        description="Remove duplicate images from dataset"
    )
    
    # Partition input files
    pipeline.add_stage(FilePartitioningStage(
        file_paths=input_dir,
        files_per_partition=1,
        file_extensions=[".tar"],
    ))
    
    # Read original images
    pipeline.add_stage(ImageReaderStage(
        task_batch_size=100,
        verbose=True,
        num_threads=16,
        num_gpus_per_worker=0.25,
    ))
    
    # Remove duplicates based on removal list
    pipeline.add_stage(ImageDuplicatesRemovalStage(
        removal_parquets_dir=removal_dir + "/duplicates",
        duplicate_id_field="id",
        verbose=True,
    ))
    
    # Write deduplicated dataset
    pipeline.add_stage(ImageWriterStage(
        output_dir=output_dir,
        remove_image_data=True,
        verbose=True,
    ))
    
    return pipeline

Parameters#

Parameter

Type

Default

Description

removal_parquets_dir

str

Required

Directory containing Parquet files with image IDs to remove

duplicate_id_field

str

"id"

Name of the column containing image IDs to remove

verbose

bool

False

Enable verbose logging for debugging

num_workers_per_node

int | None

None

Number of workers per node for the stage (helps avoid OOM when multiple actors load the same removal Parquet files)

Run the Removal Pipeline#

# Set paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
REMOVAL_DIR = "/path/to/removal_ids"
OUTPUT_DIR = "/path/to/deduplicated/dataset"

# Run removal pipeline
removal_pipeline = create_image_removal_pipeline(
    INPUT_TAR_DIR, REMOVAL_DIR, OUTPUT_DIR
)
removal_pipeline.run()  # Uses XennaExecutor by default

4. Inspect Results#

After deduplication, examine the results to understand what was removed:

Check Removal Statistics#

import pandas as pd
from glob import glob

# Read removal results
removal_files = glob(f"{REMOVAL_DIR}/duplicates/*.parquet")
removal_dfs = [pd.read_parquet(f) for f in removal_files]
all_removals = pd.concat(removal_dfs, ignore_index=True)

print(f"Total images marked for removal: {len(all_removals)}")
print(f"Unique images marked for removal: {all_removals['id'].nunique()}")

# Show sample of removed images
print("\nSample removed image IDs:")
print(all_removals['id'].head(10).tolist())

Compare Dataset Sizes#

import os

def count_tar_files(directory):
    """Count tar files in a directory."""
    tar_files = glob(os.path.join(directory, "*.tar"))
    return len(tar_files)

original_count = count_tar_files(INPUT_WDS_DIR)
deduplicated_count = count_tar_files(OUTPUT_DIR)

print(f"Original dataset: {original_count} tar files")
print(f"Deduplicated dataset: {deduplicated_count} tar files")
print(f"Reduction: {original_count - deduplicated_count} files ({(1 - deduplicated_count/original_count)*100:.1f}%)")

5. Complete Workflow Script#

Here’s the complete workflow that combines all steps:

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
from nemo_curator.stages.image.io.image_reader import ImageReaderStage
from nemo_curator.stages.image.io.image_writer import ImageWriterStage
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter

def run_image_deduplication_workflow():
    """Run complete image deduplication workflow."""
    
    # Define paths
    INPUT_TAR_DIR = "/path/to/input/tar_dataset"
    EMBEDDINGS_DIR = "/path/to/embeddings"
    REMOVAL_DIR = "/path/to/removal_ids"
    OUTPUT_DIR = "/path/to/deduplicated/dataset"
    MODEL_DIR = "/path/to/models"
    
    print("Step 1: Generating embeddings...")
    
    # Step 1: Generate embeddings
    embedding_pipeline = Pipeline(name="embedding", description="Generate embeddings")
    
    embedding_pipeline.add_stage(FilePartitioningStage(
        file_paths=INPUT_WDS_DIR, files_per_partition=1, file_extensions=[".tar"]
    ))
    embedding_pipeline.add_stage(ImageReaderStage(
        task_batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
    ))
    embedding_pipeline.add_stage(ImageEmbeddingStage(
        model_dir=MODEL_DIR, num_gpus_per_worker=0.25, 
        model_inference_batch_size=32, verbose=True
    ))
    embedding_pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
        fields=["image_id", "embedding"]
    ))
    embedding_pipeline.add_stage(ParquetWriter(path=EMBEDDINGS_DIR))
    
    embedding_pipeline.run()  # Uses XennaExecutor by default
    
    print("Step 2: Running semantic deduplication...")
    
    # Step 2: Semantic deduplication
    dedup_workflow = SemanticDeduplicationWorkflow(
        input_path=EMBEDDINGS_DIR,
        output_path=REMOVAL_DIR,
        id_field="image_id",
        embedding_field="embedding",
        n_clusters=100,
        eps=0.01,
        verbose=True,
    )
    dedup_workflow.run()
    
    print("Step 3: Removing duplicate images...")
    
    # Step 3: Remove duplicates
    removal_pipeline = Pipeline(name="removal", description="Remove duplicates")
    
    removal_pipeline.add_stage(FilePartitioningStage(
        file_paths=INPUT_WDS_DIR, files_per_partition=1, file_extensions=[".tar"]
    ))
    removal_pipeline.add_stage(ImageReaderStage(
        task_batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
    ))
    removal_pipeline.add_stage(ImageDuplicatesRemovalStage(
        removal_parquets_dir=REMOVAL_DIR + "/duplicates",
        duplicate_id_field="id",
        verbose=True,
    ))
    removal_pipeline.add_stage(ImageWriterStage(
        output_dir=OUTPUT_DIR, remove_image_data=True, verbose=True
    ))
    
    removal_pipeline.run()  # Uses XennaExecutor by default
    
    print(f"Deduplication complete! Results saved to: {OUTPUT_DIR}")

if __name__ == "__main__":
    run_image_deduplication_workflow()

Next Steps#

After running image deduplication:

  1. Quality assessment: Manually review a sample of removed duplicates

  2. Combine with filtering: Run aesthetic/NSFW filtering on deduplicated data

  3. Export for training: Prepare final curated dataset for ML training

  4. Monitor metrics: Track deduplication rates across different image types