Curate VideoTutorials

Split and Remove Duplicates Workflow

View as Markdown

Learn how to run the splitting pipeline to generate clips and embeddings, then remove near-duplicate clips using semantic duplicate removal.

Tutorial Steps:
1:local:
2:depth: 2

Before You Start


1. Generate Clips and Embeddings

Run the splitting example. Set VIDEO_DIR, OUT_DIR, and MODEL_DIR first.

$python tutorials/video/getting-started/video_split_clip_example.py \
> --video-dir "$VIDEO_DIR" \
> --model-dir "$MODEL_DIR" \
> --output-clip-path "$OUT_DIR" \
> --splitting-algorithm fixed_stride \
> --fixed-stride-split-duration 10.0 \
> --embedding-algorithm cosmos-embed1-224p \
> --transcode-encoder libopenh264 \
> --verbose

Writer-related flags you can add:

$ --no-upload-clips # Do not write MP4 files
$ --dry-run # Write nothing; validate only

The pipeline writes embeddings under $OUT_DIR/ce1_embd_parquet/ when using Cosmos-Embed1.

Embedding Format Example

The pipeline writes embeddings to Parquet with two columns:

  • id: String UUID for the clip
  • embedding: List of float values with length equal to the model’s embedding dimension (for Cosmos-Embed1, 768)
$OUT_DIR/
ce1_embd_parquet/
1a2b3c4d-....parquet
5e6f7g8h-....parquet

2. Run Semantic Duplicate Removal

Use K-means clustering followed by pairwise similarity on the Parquet embeddings.

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.deduplication.semantic.kmeans import KMeansStage
3from nemo_curator.stages.deduplication.semantic.pairwise import PairwiseStage
4
5INPUT_PARQUET = f"{OUT_DIR}/ce1_embd_parquet" # or s3://...
6OUTPUT_DIR = f"{OUT_DIR}/semantic_dedup"
7
8pipe = Pipeline(name="video_semantic_dedup", description="K-means + pairwise duplicate removal")
9
10pipe.add_stage(
11 KMeansStage(
12 n_clusters=1000,
13 id_field="id",
14 embedding_field="embedding",
15 input_path=INPUT_PARQUET,
16 output_path=f"{OUTPUT_DIR}/kmeans",
17 input_filetype="parquet",
18 embedding_dim=512,
19 read_kwargs={"storage_options": None},
20 write_kwargs={"storage_options": None},
21 )
22)
23
24pipe.add_stage(
25 PairwiseStage(
26 id_field="id",
27 embedding_field="embedding",
28 input_path=f"{OUTPUT_DIR}/kmeans",
29 output_path=f"{OUTPUT_DIR}/pairwise",
30 which_to_keep="hard", # or "easy" or "random"
31 sim_metric="cosine", # or "l2"
32 pairwise_batch_size=1024,
33 read_kwargs={"storage_options": None},
34 write_kwargs={"storage_options": None},
35 )
36)
37
38pipe.run()

which_to_keep selects the representative within each cluster: “hard” keeps outliers far from the centroid, “easy” keeps the nearest to the centroid, and “random” ignores distance and picks randomly.

sim_metric sets the distance used for similarity: “cosine” uses cosine distance (1 − cosine similarity), while “l2” uses Euclidean distance.

pairwise_batch_size controls how many items are processed per GPU batch during pairwise similarity; larger values can be faster but require more GPU memory.


3. Inspect Results

  • K-means outputs per-cluster partitions under ${OUTPUT_DIR}/kmeans/.
  • Pairwise outputs per-cluster similarity files under ${OUTPUT_DIR}/pairwise/ with columns including id, max_id, and cosine_sim_score.
  • Use these to decide keep/remove policies or downstream sampling.

4. Export for Training

After duplicate removal, export curated clips and metadata for training. Common video exports:

  • Parquet index + media files (mp4/webp) under ${OUT_DIR}
  • Tar archives (WebDataset-style) containing per-clip payloads and JSON/Parquet metadata

Video-specific pointers:

  • Use ClipWriterStage path helpers to locate outputs: nemo_curator/stages/video/io/clip_writer.py.
    • Processed videos: get_output_path_processed_videos(OUT_DIR)
    • Clip chunks and previews: get_output_path_processed_clip_chunks(OUT_DIR), get_output_path_previews(OUT_DIR)
    • Embeddings parquet: ${OUT_DIR}/ce1_embd_parquet

Example Export

The following example packages clips and minimal JSON metadata into tar files:

1import json, math, os, tarfile
2import io
3from glob import glob
4
5OUT_DIR = os.environ["OUT_DIR"]
6clips_dir = os.path.join(OUT_DIR, "clips") # adjust if filtering path used
7meta_parquet = os.path.join(OUT_DIR, "ce1_embd_parquet")
8
9def iter_clips(path):
10 for p in glob(os.path.join(path, "**", "*.mp4"), recursive=True):
11 clip_id = os.path.splitext(os.path.basename(p))[0]
12 yield clip_id, p
13
14def write_shards(items, out_dir, samples_per_shard=10000, max_shards=5):
15 os.makedirs(out_dir, exist_ok=True)
16 shard, buf = 0, []
17 for i, (clip_id, mp4_path) in enumerate(items, 1):
18 buf.append((clip_id, mp4_path))
19 if i % samples_per_shard == 0:
20 _write_tar(shard, buf, out_dir, max_shards)
21 shard, buf = shard + 1, []
22 if buf:
23 _write_tar(shard, buf, out_dir, max_shards)
24
25def _write_tar(shard, records, out_dir, max_shards):
26 tar_name = f"{shard:0{max_shards}d}.tar"
27 tar_path = os.path.join(out_dir, tar_name)
28 with tarfile.open(tar_path, "w") as tf:
29 for clip_id, mp4_path in records:
30 tf.add(mp4_path, arcname=f"{clip_id}.mp4")
31 info = tarfile.TarInfo(name=f"{clip_id}.json")
32 payload = json.dumps({"id": clip_id}).encode("utf-8")
33 info.size = len(payload)
34 tf.addfile(info, fileobj=io.BytesIO(payload))
35
36write_shards(iter_clips(clips_dir), os.path.join(OUT_DIR, "wds"))