Curate VideoTutorialsPipeline Customization

Adding Custom Stages

View as Markdown

Learn how to customize NeMo Curator by adding new pipeline stages.

NeMo Curator includes a series of pipelines with default stages; however, they might not always meet your pipeline requirements. This tutorial demonstrates how to add a new pipeline stage and integrate it into a pipeline.

Before You Start

Before you begin adding a new pipeline stage, make sure that you have:

How to Add a Custom Pipeline Stage

1. Define the Stage Class

1from typing import List
2
3from nemo_curator.stages.base import ProcessingStage
4from nemo_curator.stages.resources import Resources
5from nemo_curator.tasks.video import VideoTask
6
7class MyCustomStage(ProcessingStage[VideoTask, VideoTask]):
8 """Example stage that reads and writes to the VideoTask."""
9
10 name = "my_custom_stage"
11 resources = Resources(cpus=2.0, gpu_memory_gb=8.0)
12
13 def setup(self, worker_metadata=None) -> None:
14 # Initialize models or allocate resources here
15 pass
16
17 def process(self, task: VideoTask) -> VideoTask | list[VideoTask]:
18 # Implement your processing and return the modified task (or list of tasks)
19 return task

2. Specify Resource Requirements

1# You can override resources at construction time using with_()
2from nemo_curator.stages.resources import Resources
3
4stage = MyCustomStage().with_(
5 resources=Resources(cpus=4.0, gpu_memory_gb=16.0)
6)

3. Implement Core Methods

Required methods for every stage:

Setup Method

1def setup(self, worker_metadata=None) -> None:
2 # Load models, warm up caches, etc.
3 pass

Process Data Method

1def process(self, task: VideoTask) -> VideoTask | list[VideoTask]:
2 # Process implementation
3 return task

4. Update Data Model

Modify the pipeline’s data model to include your stage’s outputs:

1# In Ray Curator, video data lives in VideoTask.data (a Video) which contains Clips.
2# You can attach new information to existing structures (for example, store derived
3# arrays in clip.egomotion or add keys to dictionaries), or maintain your own
4# data alongside and write it in a custom writer stage.

5. Modify Pipeline Output Handling

Update the ClipWriterStage to handle your stage’s output:

  1. Create a writer method:

    1def _write_custom_output(self, clip: Clip) -> None:
    2 # writing implementation
  2. Add to the main process:

    1def process(self, task: VideoTask) -> VideoTask | list[VideoTask]:
    2 # existing processing
    3 self._write_custom_output(clip)
    4 # continue processing
    5 return task

Integration Steps

1. Build and Run a Pipeline in Python

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.video.io.video_reader import VideoReader
3from nemo_curator.stages.video.io.clip_writer import ClipWriterStage
4
5pipeline = (
6 Pipeline(name="custom-video-pipeline")
7 .add_stage(VideoReader(input_video_path="/path/to/videos", video_limit=10))
8 .add_stage(MyCustomStage())
9 .add_stage(
10 ClipWriterStage(
11 output_path="/path/to/output",
12 input_path="/path/to/videos",
13 upload_clips=True,
14 dry_run=False,
15 generate_embeddings=False,
16 generate_previews=False,
17 generate_captions=False,
18 embedding_algorithm="cosmos-embed1-224p",
19 caption_models=["qwen"],
20 enhanced_caption_models=["qwen_lm"],
21 )
22 )
23)
24
25# Optionally provide an executor; defaults to XennaExecutor
26pipeline.run()

2. Refer to Examples

For end-to-end usage, review and adapt the example:

  • examples/video/video_split_clip_example.py

3. (Optional) Containerize Your Changes

If you need a container image, extend your base image using a Dockerfile and include your code and dependencies. Then build and run with your preferred container tooling.