Curate ImagesTutorials

Image Duplicate Removal Workflow

View as Markdown

Learn how to run a complete image duplicate removal workflow that generates embeddings, identifies semantic duplicates, and removes similar images from your dataset.

Tutorial Steps:
1:local:
2:depth: 2

Before You Start


1. Generate Image Embeddings

Create CLIP embeddings for all images in your dataset. This pipeline reads images, generates embeddings, and saves them to Parquet format for duplicate removal processing.

Define the Embedding Pipeline

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.file_partitioning import FilePartitioningStage
3from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
4from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
5from nemo_curator.stages.image.io.image_reader import ImageReaderStage
6from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
7
8def create_image_embedding_pipeline(input_dir, embeddings_dir, model_dir):
9 """Create pipeline to generate embeddings for duplicate removal."""
10
11 pipeline = Pipeline(
12 name="image_embedding",
13 description="Generate CLIP embeddings for image duplicate removal"
14 )
15
16 # Partition tar files for parallel processing
17 pipeline.add_stage(FilePartitioningStage(
18 file_paths=input_dir,
19 files_per_partition=1,
20 file_extensions=[".tar"],
21 ))
22
23 # Read images from tar archives
24 pipeline.add_stage(ImageReaderStage(
25 batch_size=100,
26 verbose=True,
27 num_threads=16,
28 num_gpus_per_worker=0.25,
29 ))
30
31 # Generate CLIP embeddings
32 pipeline.add_stage(ImageEmbeddingStage(
33 model_dir=model_dir,
34 num_gpus_per_worker=0.25,
35 model_inference_batch_size=32,
36 verbose=True,
37 ))
38
39 # Convert to document format for deduplication
40 pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
41 fields=["image_id", "embedding"]
42 ))
43
44 # Save embeddings to Parquet
45 pipeline.add_stage(ParquetWriter(path=embeddings_dir))
46
47 return pipeline

Run Embedding Generation

1# Set your paths
2INPUT_TAR_DIR = "/path/to/input/tar_dataset"
3EMBEDDINGS_DIR = "/path/to/embeddings"
4MODEL_DIR = "/path/to/models"
5
6# Create and run pipeline
7embedding_pipeline = create_image_embedding_pipeline(
8 INPUT_TAR_DIR, EMBEDDINGS_DIR, MODEL_DIR
9)
10embedding_pipeline.run() # Uses XennaExecutor by default

Embedding Format Example

The pipeline writes embeddings to Parquet with two columns:

  • image_id: String identifier for the image
  • embedding: List of float values with length 768 (CLIP ViT-L/14 dimension)
/path/to/embeddings/
part-00000-....parquet
part-00001-....parquet
part-00002-....parquet

2. Run Semantic Duplicate Removal

Use the semantic duplicate removal workflow to identify and mark duplicate images based on embedding similarity.

1from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow
2
3def create_deduplication_workflow(embeddings_dir, removal_dir):
4 """Create semantic deduplication workflow."""
5
6 return SemanticDeduplicationWorkflow(
7 input_path=embeddings_dir,
8 output_path=removal_dir,
9 id_field="image_id",
10 embedding_field="embedding",
11 n_clusters=100, # Number of clusters for grouping
12 eps=0.01, # Similarity threshold (lower = more strict)
13 verbose=True,
14 )
15
16# Set paths
17EMBEDDINGS_DIR = "/path/to/embeddings"
18REMOVAL_DIR = "/path/to/removal_ids"
19
20# Run deduplication
21dedup_workflow = create_deduplication_workflow(EMBEDDINGS_DIR, REMOVAL_DIR)
22dedup_workflow.run()

3. Remove Duplicate Images

After identifying duplicates, use ImageDuplicatesRemovalStage to filter them from your dataset.

Filter the original dataset to remove identified duplicates and create the final deduplicated dataset.

1from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
2from nemo_curator.stages.image.io.image_writer import ImageWriterStage
3
4def create_image_removal_pipeline(input_dir, removal_dir, output_dir):
5 """Create pipeline to remove duplicate images."""
6
7 pipeline = Pipeline(
8 name="image_deduplication",
9 description="Remove duplicate images from dataset"
10 )
11
12 # Partition input files
13 pipeline.add_stage(FilePartitioningStage(
14 file_paths=input_dir,
15 files_per_partition=1,
16 file_extensions=[".tar"],
17 ))
18
19 # Read original images
20 pipeline.add_stage(ImageReaderStage(
21 batch_size=100,
22 verbose=True,
23 num_threads=16,
24 num_gpus_per_worker=0.25,
25 ))
26
27 # Remove duplicates based on removal list
28 pipeline.add_stage(ImageDuplicatesRemovalStage(
29 removal_parquets_dir=removal_dir + "/duplicates",
30 duplicate_id_field="id",
31 verbose=True,
32 ))
33
34 # Write deduplicated dataset
35 pipeline.add_stage(ImageWriterStage(
36 output_dir=output_dir,
37 remove_image_data=True,
38 verbose=True,
39 ))
40
41 return pipeline

Run the Removal Pipeline

1# Set paths
2INPUT_TAR_DIR = "/path/to/input/tar_dataset"
3REMOVAL_DIR = "/path/to/removal_ids"
4OUTPUT_DIR = "/path/to/deduplicated/dataset"
5
6# Run removal pipeline
7removal_pipeline = create_image_removal_pipeline(
8 INPUT_TAR_DIR, REMOVAL_DIR, OUTPUT_DIR
9)
10removal_pipeline.run() # Uses XennaExecutor by default

4. Inspect Results

After deduplication, examine the results to understand what was removed:

Check Removal Statistics

1import pandas as pd
2from glob import glob
3
4# Read removal results
5removal_files = glob(f"{REMOVAL_DIR}/duplicates/*.parquet")
6removal_dfs = [pd.read_parquet(f) for f in removal_files]
7all_removals = pd.concat(removal_dfs, ignore_index=True)
8
9print(f"Total images marked for removal: {len(all_removals)}")
10print(f"Unique images marked for removal: {all_removals['id'].nunique()}")
11
12# Show sample of removed images
13print("\nSample removed image IDs:")
14print(all_removals['id'].head(10).tolist())

Compare Dataset Sizes

1import os
2
3def count_tar_files(directory):
4 """Count tar files in a directory."""
5 tar_files = glob(os.path.join(directory, "*.tar"))
6 return len(tar_files)
7
8original_count = count_tar_files(INPUT_TAR_DIR)
9deduplicated_count = count_tar_files(OUTPUT_DIR)
10
11print(f"Original dataset: {original_count} tar files")
12print(f"Deduplicated dataset: {deduplicated_count} tar files")
13print(f"Reduction: {original_count - deduplicated_count} files ({(1 - deduplicated_count/original_count)*100:.1f}%)")

5. Complete Workflow Script

Here’s the complete workflow that combines all steps:

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow
3from nemo_curator.stages.file_partitioning import FilePartitioningStage
4from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
5from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
6from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
7from nemo_curator.stages.image.io.image_reader import ImageReaderStage
8from nemo_curator.stages.image.io.image_writer import ImageWriterStage
9from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
10
11def run_image_deduplication_workflow():
12 """Run complete image deduplication workflow."""
13
14 # Define paths
15 INPUT_TAR_DIR = "/path/to/input/tar_dataset"
16 EMBEDDINGS_DIR = "/path/to/embeddings"
17 REMOVAL_DIR = "/path/to/removal_ids"
18 OUTPUT_DIR = "/path/to/deduplicated/dataset"
19 MODEL_DIR = "/path/to/models"
20
21 print("Step 1: Generating embeddings...")
22
23 # Step 1: Generate embeddings
24 embedding_pipeline = Pipeline(name="embedding", description="Generate embeddings")
25
26 embedding_pipeline.add_stage(FilePartitioningStage(
27 file_paths=INPUT_TAR_DIR, files_per_partition=1, file_extensions=[".tar"]
28 ))
29 embedding_pipeline.add_stage(ImageReaderStage(
30 batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
31 ))
32 embedding_pipeline.add_stage(ImageEmbeddingStage(
33 model_dir=MODEL_DIR, num_gpus_per_worker=0.25,
34 model_inference_batch_size=32, verbose=True
35 ))
36 embedding_pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
37 fields=["image_id", "embedding"]
38 ))
39 embedding_pipeline.add_stage(ParquetWriter(path=EMBEDDINGS_DIR))
40
41 embedding_pipeline.run() # Uses XennaExecutor by default
42
43 print("Step 2: Running semantic deduplication...")
44
45 # Step 2: Semantic deduplication
46 dedup_workflow = SemanticDeduplicationWorkflow(
47 input_path=EMBEDDINGS_DIR,
48 output_path=REMOVAL_DIR,
49 id_field="image_id",
50 embedding_field="embedding",
51 n_clusters=100,
52 eps=0.01,
53 verbose=True,
54 )
55 dedup_workflow.run()
56
57 print("Step 3: Removing duplicate images...")
58
59 # Step 3: Remove duplicates
60 removal_pipeline = Pipeline(name="removal", description="Remove duplicates")
61
62 removal_pipeline.add_stage(FilePartitioningStage(
63 file_paths=INPUT_TAR_DIR, files_per_partition=1, file_extensions=[".tar"]
64 ))
65 removal_pipeline.add_stage(ImageReaderStage(
66 batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
67 ))
68 removal_pipeline.add_stage(ImageDuplicatesRemovalStage(
69 removal_parquets_dir=REMOVAL_DIR + "/duplicates",
70 duplicate_id_field="id",
71 verbose=True,
72 ))
73 removal_pipeline.add_stage(ImageWriterStage(
74 output_dir=OUTPUT_DIR, remove_image_data=True, verbose=True
75 ))
76
77 removal_pipeline.run() # Uses XennaExecutor by default
78
79 print(f"Deduplication complete! Results saved to: {OUTPUT_DIR}")
80
81if __name__ == "__main__":
82 run_image_deduplication_workflow()

Next Steps

After running image deduplication:

  1. Quality assessment: Manually review a sample of removed duplicates
  2. Combine with filtering: Run aesthetic/NSFW filtering on deduplicated data
  3. Export for training: Prepare final curated dataset for ML training
  4. Monitor metrics: Track deduplication rates across different image types