Curate VideoTutorials

Create a Video Pipeline

View as Markdown

Learn the basics of creating a video pipeline in Curator by following a split-and-clip pipeline example.

Tutorial Steps:
1:local:
2:depth: 2

Before You Start

  • Follow the Get Started guide to install the package, prepare the model directory, and set up your data paths.

Concepts and Mental Model

Use this overview to understand how stages pass data through the pipeline.

  • Pipeline: An ordered list of stages that process data.
  • Stage: A modular operation (for example, read, split, encode, embed, write).
  • Executor: Runs the pipeline (Ray/Xenna backend).
  • Data units: Input videos → clip windows → frames → embeddings + files.
  • Common choices:
    • Splitting: fixed stride vs. scene-change (TransNetV2)
    • Encoding: libopenh264, h264_nvenc, or libx264
    • Embeddings: Cosmos-Embed1
  • Outputs: Clips (mp4), previews (optional), and parquet embeddings for downstream tasks (such as semantic duplicate removal).

For more information, refer to the Video Concepts section.


1. Define Imports and Paths

Import required classes and define paths used throughout the example.

1from nemo_curator.pipeline import Pipeline
2
3from nemo_curator.stages.video.io.video_reader import VideoReader
4from nemo_curator.stages.video.clipping.clip_extraction_stages import (
5 FixedStrideExtractorStage,
6 ClipTranscodingStage,
7)
8from nemo_curator.stages.video.clipping.clip_frame_extraction import (
9 ClipFrameExtractionStage,
10)
11from nemo_curator.utils.decoder_utils import FrameExtractionPolicy, FramePurpose
12from nemo_curator.stages.video.embedding.cosmos_embed1 import (
13 CosmosEmbed1FrameCreationStage,
14 CosmosEmbed1EmbeddingStage,
15)
16from nemo_curator.stages.video.io.clip_writer import ClipWriterStage
17
18VIDEO_DIR = "/path/to/videos"
19MODEL_DIR = "/path/to/models"
20OUT_DIR = "/path/to/output_clips"

2. Create the Pipeline

Instantiate a named pipeline to orchestrate the stages.

1pipeline = Pipeline(name="video_splitting", description="Split videos into clips")

3. Define Stages

Add modular stages to read, split, encode, extract frames, embed, and write outputs.

Read Input Videos

Read videos from storage and extract metadata to prepare for clipping.

1pipeline.add_stage(
2 VideoReader(input_video_path=VIDEO_DIR, video_limit=None, verbose=True)
3)

Split into Clips

Create clip windows using fixed intervals or scene-change detection.

1pipeline.add_stage(
2 FixedStrideExtractorStage(
3 clip_len_s=10.0,
4 clip_stride_s=10.0,
5 min_clip_length_s=2.0,
6 limit_clips=0,
7 )
8)

Encode Clips

Convert clip buffers to H.264 using the selected encoder and settings. Refer to Clip Encoding for encoder choices and NVENC setup.

1pipeline.add_stage(
2 ClipTranscodingStage(
3 num_cpus_per_worker=6.0,
4 encoder="libopenh264",
5 encoder_threads=1,
6 encode_batch_size=16,
7 use_hwaccel=False,
8 use_input_bit_rate=False,
9 num_clips_per_chunk=32,
10 verbose=True,
11 )
12)

Prepare Frames for Embeddings (Optional)

Extract frames at target rates for downstream embedding models.

1pipeline.add_stage(
2 ClipFrameExtractionStage(
3 extraction_policies=(FrameExtractionPolicy.sequence,),
4 extract_purposes=(FramePurpose.EMBEDDINGS,),
5 target_res=(-1, -1), # no resize
6 verbose=True,
7 )
8)

Generate Embeddings (Cosmos-Embed1)

Create Cosmos-Embed1-ready frames and compute clip-level embeddings.

1pipeline.add_stage(
2 CosmosEmbed1FrameCreationStage(model_dir=MODEL_DIR, target_fps=2.0, verbose=True)
3)
4pipeline.add_stage(
5 CosmosEmbed1EmbeddingStage(model_dir=MODEL_DIR, gpu_memory_gb=20.0, verbose=True)
6)

Write Clips and Metadata

Write clips, embeddings, and metadata to the output directory. Refer to Save & Export for a full list of parameters.

1pipeline.add_stage(
2 ClipWriterStage(
3 output_path=OUT_DIR,
4 input_path=VIDEO_DIR,
5 upload_clips=True,
6 dry_run=False,
7 generate_embeddings=True,
8 generate_previews=False,
9 generate_captions=False,
10 embedding_algorithm="cosmos-embed1",
11 caption_models=[],
12 enhanced_caption_models=[],
13 verbose=True,
14 )
15)

4. Run the Pipeline

Run the configured pipeline using the executor.

1pipeline.run()