API ReferenceTasks
VideoTask
VideoTask is the task type for video processing in NeMo Curator.
Import
1 from nemo_curator.tasks import VideoTask
Class Definition
1 from dataclasses import dataclass 2 from nemo_curator.tasks.video import Video 3 4 @dataclass 5 class VideoTask(Task[Video]): 6 """Task containing a single video for processing. 7 8 Attributes: 9 task_id: Unique identifier for this task. 10 dataset_name: Name of the source dataset. 11 data: Video object with path and metadata. 12 """ 13 14 task_id: str 15 dataset_name: str 16 data: Video
Video Class
The video data is represented by a Video object:
1 @dataclass 2 class Video: 3 """Represents a video file with metadata. 4 5 Attributes: 6 path: Path to the video file. 7 start_time: Start time in seconds (for clips). 8 end_time: End time in seconds (for clips). 9 metadata: Additional metadata dictionary. 10 embeddings: Optional embedding vector. 11 """ 12 13 path: str 14 start_time: float | None = None 15 end_time: float | None = None 16 metadata: dict[str, Any] = field(default_factory=dict) 17 embeddings: np.ndarray | None = None
Properties
num_items
Get the number of items (always 1 for VideoTask).
1 @property 2 def num_items(self) -> int: 3 """Returns 1 (VideoTask represents a single video)."""
Creating VideoTask
1 from nemo_curator.tasks import VideoTask 2 from nemo_curator.tasks.video import Video 3 4 # Create a video object 5 video = Video( 6 path="/data/videos/video1.mp4", 7 start_time=0.0, 8 end_time=30.0, 9 metadata={ 10 "duration": 120.5, 11 "fps": 30, 12 "resolution": "1920x1080", 13 }, 14 ) 15 16 # Create task 17 task = VideoTask( 18 task_id="video_001", 19 dataset_name="video_dataset", 20 data=video, 21 )
Usage in Stages
1 from dataclasses import dataclass 2 from nemo_curator.stages.base import ProcessingStage 3 from nemo_curator.tasks import VideoTask 4 5 @dataclass 6 class VideoFilterStage(ProcessingStage[VideoTask, VideoTask]): 7 """Filter videos based on duration.""" 8 9 name: str = "VideoFilter" 10 min_duration: float = 5.0 11 max_duration: float = 300.0 12 13 def inputs(self) -> tuple[list[str], list[str]]: 14 return ["data"], [] 15 16 def outputs(self) -> tuple[list[str], list[str]]: 17 return ["data"], [] 18 19 def process(self, task: VideoTask) -> VideoTask | None: 20 video = task.data 21 duration = video.metadata.get("duration", 0) 22 23 if not (self.min_duration <= duration <= self.max_duration): 24 return None 25 26 return VideoTask( 27 task_id=f"{task.task_id}_filtered", 28 dataset_name=task.dataset_name, 29 data=video, 30 _metadata=task._metadata, 31 _stage_perf=task._stage_perf, 32 )
Common Operations
Splitting Videos into Clips
The video splitting stages return multiple VideoTask objects:
1 def process(self, task: VideoTask) -> list[VideoTask]: 2 clips = [] 3 video = task.data 4 5 for i, (start, end) in enumerate(self._compute_splits(video)): 6 clip_video = Video( 7 path=video.path, 8 start_time=start, 9 end_time=end, 10 metadata=video.metadata.copy(), 11 ) 12 clips.append(VideoTask( 13 task_id=f"{task.task_id}_clip_{i}", 14 dataset_name=task.dataset_name, 15 data=clip_video, 16 _metadata=task._metadata, 17 _stage_perf=task._stage_perf, 18 )) 19 20 return clips
Adding Embeddings
1 def process(self, task: VideoTask) -> VideoTask: 2 video = task.data 3 video.embeddings = self.encoder.encode(video.path) 4 5 return VideoTask( 6 task_id=f"{task.task_id}_{self.name}", 7 dataset_name=task.dataset_name, 8 data=video, 9 _metadata=task._metadata, 10 _stage_perf=task._stage_perf, 11 )