API ReferenceTasks

VideoTask

View as Markdown

VideoTask is the task type for video processing in NeMo Curator.

Import

1from nemo_curator.tasks import VideoTask

Class Definition

1from dataclasses import dataclass
2from nemo_curator.tasks.video import Video
3
4@dataclass
5class VideoTask(Task[Video]):
6 """Task containing a single video for processing.
7
8 Attributes:
9 task_id: Unique identifier for this task.
10 dataset_name: Name of the source dataset.
11 data: Video object with path and metadata.
12 """
13
14 task_id: str
15 dataset_name: str
16 data: Video

Video Class

The video data is represented by a Video object:

1@dataclass
2class Video:
3 """Represents a video file with metadata.
4
5 Attributes:
6 path: Path to the video file.
7 start_time: Start time in seconds (for clips).
8 end_time: End time in seconds (for clips).
9 metadata: Additional metadata dictionary.
10 embeddings: Optional embedding vector.
11 """
12
13 path: str
14 start_time: float | None = None
15 end_time: float | None = None
16 metadata: dict[str, Any] = field(default_factory=dict)
17 embeddings: np.ndarray | None = None

Properties

num_items

Get the number of items (always 1 for VideoTask).

1@property
2def num_items(self) -> int:
3 """Returns 1 (VideoTask represents a single video)."""

Creating VideoTask

1from nemo_curator.tasks import VideoTask
2from nemo_curator.tasks.video import Video
3
4# Create a video object
5video = Video(
6 path="/data/videos/video1.mp4",
7 start_time=0.0,
8 end_time=30.0,
9 metadata={
10 "duration": 120.5,
11 "fps": 30,
12 "resolution": "1920x1080",
13 },
14)
15
16# Create task
17task = VideoTask(
18 task_id="video_001",
19 dataset_name="video_dataset",
20 data=video,
21)

Usage in Stages

1from dataclasses import dataclass
2from nemo_curator.stages.base import ProcessingStage
3from nemo_curator.tasks import VideoTask
4
5@dataclass
6class VideoFilterStage(ProcessingStage[VideoTask, VideoTask]):
7 """Filter videos based on duration."""
8
9 name: str = "VideoFilter"
10 min_duration: float = 5.0
11 max_duration: float = 300.0
12
13 def inputs(self) -> tuple[list[str], list[str]]:
14 return ["data"], []
15
16 def outputs(self) -> tuple[list[str], list[str]]:
17 return ["data"], []
18
19 def process(self, task: VideoTask) -> VideoTask | None:
20 video = task.data
21 duration = video.metadata.get("duration", 0)
22
23 if not (self.min_duration <= duration <= self.max_duration):
24 return None
25
26 return VideoTask(
27 task_id=f"{task.task_id}_filtered",
28 dataset_name=task.dataset_name,
29 data=video,
30 _metadata=task._metadata,
31 _stage_perf=task._stage_perf,
32 )

Common Operations

Splitting Videos into Clips

The video splitting stages return multiple VideoTask objects:

1def process(self, task: VideoTask) -> list[VideoTask]:
2 clips = []
3 video = task.data
4
5 for i, (start, end) in enumerate(self._compute_splits(video)):
6 clip_video = Video(
7 path=video.path,
8 start_time=start,
9 end_time=end,
10 metadata=video.metadata.copy(),
11 )
12 clips.append(VideoTask(
13 task_id=f"{task.task_id}_clip_{i}",
14 dataset_name=task.dataset_name,
15 data=clip_video,
16 _metadata=task._metadata,
17 _stage_perf=task._stage_perf,
18 ))
19
20 return clips

Adding Embeddings

1def process(self, task: VideoTask) -> VideoTask:
2 video = task.data
3 video.embeddings = self.encoder.encode(video.path)
4
5 return VideoTask(
6 task_id=f"{task.task_id}_{self.name}",
7 dataset_name=task.dataset_name,
8 data=video,
9 _metadata=task._metadata,
10 _stage_perf=task._stage_perf,
11 )

Source Code

View source on GitHub