API ReferenceTasks

AudioTask

View as Markdown

AudioTask is the task type for audio processing in NeMo Curator. Each AudioTask holds a single manifest entry as a dict, matching the convention used by VideoTask and FileGroupTask.

Import

1from nemo_curator.tasks import AudioTask

Class Definition

1from dataclasses import dataclass
2
3@dataclass
4class AudioTask(Task[dict]):
5 """Task containing a single audio manifest entry for processing.
6
7 Attributes:
8 task_id: Unique identifier for this task.
9 dataset_name: Name of the source dataset.
10 data: Audio manifest entry (single dict, stored as _AttrDict).
11 """
12
13 task_id: str
14 dataset_name: str
15 data: dict # _AttrDict subclass — supports attribute-style access

Audio Manifest Format

Audio data follows the NeMo manifest format:

1{
2 "audio_filepath": "/path/to/audio.wav",
3 "duration": 5.2,
4 "text": "Transcription text...",
5 "speaker": "speaker_001",
6 "metadata": {
7 "sample_rate": 16000,
8 "channels": 1
9 }
10}

Properties

num_items

Always returns 1 — each AudioTask holds exactly one manifest entry.

1@property
2def num_items(self) -> int:
3 """Returns 1."""

Creating AudioTask

1from nemo_curator.tasks import AudioTask
2
3# Single manifest entry
4task = AudioTask(
5 task_id="audio_001",
6 dataset_name="speech_dataset",
7 data={
8 "audio_filepath": "/data/audio/sample.wav",
9 "duration": 5.2,
10 "text": "Hello world",
11 },
12)
13
14# Access fields via attribute or dict style
15task.data["audio_filepath"] # "/data/audio/sample.wav"
16task.data.audio_filepath # "/data/audio/sample.wav"

Usage in Stages

All audio stages subclass ProcessingStage[AudioTask, AudioTask] directly — there is no intermediate base class.

CPU Stage (per-task processing)

1from dataclasses import dataclass
2from nemo_curator.stages.base import ProcessingStage
3from nemo_curator.tasks import AudioTask
4
5@dataclass
6class DurationFilterStage(ProcessingStage[AudioTask, AudioTask]):
7 """Filter audio by duration."""
8
9 name: str = "DurationFilter"
10 min_duration: float = 1.0
11 max_duration: float = 30.0
12
13 def inputs(self) -> tuple[list[str], list[str]]:
14 return ["data"], []
15
16 def outputs(self) -> tuple[list[str], list[str]]:
17 return ["data"], []
18
19 def process(self, task: AudioTask) -> AudioTask | None:
20 duration = task.data.get("duration", 0)
21 if self.min_duration <= duration <= self.max_duration:
22 return task
23 return None

Batch Stage (GPU/IO processing)

1@dataclass
2class MyGpuStage(ProcessingStage[AudioTask, AudioTask]):
3 """GPU stage using process_batch."""
4
5 name: str = "MyGpuStage"
6
7 def process(self, task: AudioTask) -> AudioTask:
8 raise NotImplementedError("Use process_batch for GPU stages")
9
10 def process_batch(self, tasks: list[AudioTask]) -> list[AudioTask]:
11 # Batched GPU inference
12 paths = [t.data["audio_filepath"] for t in tasks]
13 results = self.model.infer(paths)
14 for task, result in zip(tasks, results):
15 task.data["pred_text"] = result
16 return tasks

Common Operations

ASR Transcription

1def process(self, task: AudioTask) -> AudioTask:
2 audio_path = task.data["audio_filepath"]
3 task.data["pred_text"] = self.asr_model.transcribe(audio_path)
4 return task

Quality Scoring

1def process(self, task: AudioTask) -> AudioTask:
2 if "text" in task.data and "pred_text" in task.data:
3 task.data["wer"] = compute_wer(task.data["text"], task.data["pred_text"])
4 return task

Source Code

View source on GitHub