Source code for nv_ingest_api.util.dataloader.dataloader
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2025, NVIDIA CORPORATION.
from pathlib import Path
from abc import ABC, abstractmethod
import queue
import threading
import subprocess
import json
import logging
import math
import importlib.util
from enum import Enum
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import os
import glob
from nv_ingest_api.util.system.hardware_info import SystemResourceProbe
logger = logging.getLogger(__name__)
try:
importlib.util.find_spec("ffmpeg")
subprocess.run(["ffmpeg", "-version"], capture_output=True)
except Exception:
logger.error(
"Unable to load the Dataloader, ffmpeg was not installed, "
"please install it using `pip install ffmpeg-python` and `apt-get install ffmpeg`"
)
ffmpeg = None
else:
import ffmpeg
if not ffmpeg:
DataLoader = None
MediaInterface = None
else:
[docs]
class LoaderInterface(ABC):
[docs]
@abstractmethod
def split(self, input_path: str, output_dir: str, split_interval: int = 0):
pass
@abstractmethod
def _get_path_metadata(self, path: str = None):
pass
def _probe(filename, format=None, file_handle=None, timeout=None, **kwargs):
args = ["ffprobe", "-show_format", "-show_streams", "-of", "json"]
args += ffmpeg._utils.convert_kwargs_to_cmd_line_args(kwargs)
if file_handle:
args += ["pipe:"]
else:
args += [filename]
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
communicate_kwargs = {}
if timeout is not None:
communicate_kwargs["timeout"] = timeout
if file_handle:
communicate_kwargs["input"] = file_handle if file_handle else filename
out, err = p.communicate(**communicate_kwargs)
if p.returncode != 0:
raise ffmpeg._run.Error("ffprobe", out, err)
return json.loads(out.decode("utf-8"))
def _get_audio_from_video(input_path: str, output_file: str, cache_path: str = None):
"""
Get the audio from a video file. if audio extraction fails, return None.
input_path: str, path to the video file
output_dir: str, path to the output directory
cache_path: str, path to the cache directory
"""
output_path = Path(output_file)
output_dir = output_path.parent
output_dir.mkdir(parents=True, exist_ok=True)
try:
capture_output, capture_error = (
ffmpeg.input(str(input_path))
.output(str(output_path), acodec="libmp3lame", map="0:a")
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
return output_path
except ffmpeg.Error as e:
logging.error(f"FFmpeg error for file {input_path}: {e.stderr.decode()}")
return None
[docs]
def strip_audio_from_video_files(input_path: str, output_dir: str, cache_path: str = None, file_type=".mp4"):
"""
Strip the audio from a series of video files and return the paths to the new files.
input_path: str, path to the video file
output_dir: str, path to the output directory
cache_path: str, path to the cache directory
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
futures = []
results = None
path = Path(input_path)
files = [path] if path.is_file() else glob.glob(os.path.join(path, f"*{file_type}"))
files = [Path(file) for file in files]
with ThreadPoolExecutor(max_workers=15) as executor:
futures = [executor.submit(_get_audio_from_video, file, output_path / f"{file.stem}.mp3") for file in files]
results = [str(future.result()) for future in tqdm(futures)]
return results
[docs]
class MediaInterface(LoaderInterface):
def __init__(self):
self.path_metadata = {}
[docs]
def probe_media(self, path_file: Path, split_interval: int, split_type: SplitType, file_handle=None):
num_splits = None
duration = None
probe = None
sample_rate = None
try:
file_size = path_file.stat().st_size # in bytes
if file_handle:
probe = _probe("pipe:", format=path_file.suffix, file_handle=file_handle)
else:
probe = _probe(str(path_file), format=path_file.suffix)
if probe["streams"][0]["codec_type"] == "video":
sample_rate = float(probe["streams"][0]["avg_frame_rate"].split("/")[0])
duration = float(probe["format"]["duration"])
elif probe["streams"][0]["codec_type"] == "audio":
sample_rate = float(probe["streams"][0]["sample_rate"])
bitrate = probe["format"]["bit_rate"]
duration = (file_size * 8) / float(bitrate)
num_splits = self.find_num_splits(file_size, sample_rate, duration, split_interval, split_type)
except ffmpeg.Error as e:
logging.error(f"FFmpeg error for file {path_file}: {e.stderr.decode()}")
except ValueError as e:
logging.error(f"Error finding number of splits for file {path_file}: {e}")
return probe, num_splits, duration
[docs]
def get_audio_from_video(self, input_path: str, output_file: str, cache_path: str = None):
return _get_audio_from_video(input_path, output_file, cache_path)
[docs]
def split(
self,
input_path: str,
output_dir: str,
split_interval: int = 0,
split_type: SplitType = SplitType.SIZE,
cache_path: str = None,
video_audio_separate: bool = False,
audio_only: bool = False,
):
"""
Split a media file into smaller chunks of `split_interval` size. if
video_audio_separate is True and the file is a video, the audio will be
extracted from the video and saved to a separate files. Data can be returned
as a tuple of (video_files, audio_files) or just files (i.e. audio files).
input_path: str, path to the media file
output_dir: str, path to the output directory
split_interval: the size of the chunk to split the media file into depending on the split type
split_type: SplitType, type of split to perform, either size, time, or frame
video_audio_separate: bool, whether to separate the video and audio files
audio_only: bool, whether to only return the audio files
"""
import ffmpeg
files_to_remove = []
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
original_input_path = input_path
if audio_only and Path(input_path).suffix in [".mp4", ".mov", ".avi", ".mkv"]:
input_path = self.get_audio_from_video(input_path, output_dir / f"{input_path.stem}.mp3")
files_to_remove.append(input_path)
path_file = Path(input_path)
file_name = path_file.stem
suffix = path_file.suffix
output_pattern = output_dir / f"{file_name}_chunk_%04d{suffix}"
num_splits = 0
cache_path = cache_path if cache_path else output_dir
try:
probe = None
probe, num_splits, duration = self.probe_media(path_file, split_interval, split_type)
segment_time = math.ceil(duration / num_splits)
output_kwargs = {
"f": "segment",
"segment_time": segment_time,
"c": "copy",
"map": "0",
# use 10% of the available cores, but at least 4 threads
# each core has 2 threads
"threads": int(max(SystemResourceProbe().get_effective_cores() * 0.2, 4)),
}
if suffix == ".mp4":
output_kwargs.update(
{
"force_key_frames": f"expr:gte(t,n_forced*{segment_time})",
"crf": 22,
"g": 50,
"sc_threshold": 0,
}
)
capture_output, capture_error = (
ffmpeg.input(str(input_path))
.output(str(output_pattern), **output_kwargs)
.run(capture_stdout=True, capture_stderr=True)
)
logging.debug(f"Split {input_path} into {num_splits} chunks")
self.path_metadata[input_path] = probe
logging.debug(capture_output)
logging.debug(f"{original_input_path} - {capture_error}")
except ffmpeg.Error as e:
logging.error(
f"FFmpeg error for file {original_input_path}: {e.stderr.decode()} {capture_output} {capture_error}"
)
return []
files = [str(output_dir / f"{file_name}_chunk_{i:04d}{suffix}") for i in range(int(num_splits))]
if video_audio_separate and suffix in [".mp4", ".mov", ".avi", ".mkv"]:
video_audio_files = []
for file in files:
file = Path(file)
audio_path = self.get_audio_from_video(file, file.with_suffix(".mp3"), cache_path)
if audio_path is not None:
video_audio_files.append(audio_path)
else:
logging.error(f"Failed to extract audio from {file}")
return files + video_audio_files
for to_remove in files_to_remove:
to_remove = Path(to_remove)
if to_remove.is_file():
logger.debug(f"Removing file {to_remove}")
to_remove.unlink()
return files
[docs]
def find_num_splits(
self,
file_size: int,
sample_rate: float,
duration: float,
split_interval: int,
split_type: SplitType,
):
"""
Find the number of splits for a media file based on the split type and interval.
file_size: int, size of the media file in bytes
sample_rate: float, sample rate of the media file in samples per second
duration: float, duration of the media file in seconds
split_interval: int, size of the chunk to split the media file into depending on the split type
split_type: SplitType, type of split to perform, either size, time, or frame
"""
if split_type == SplitType.SIZE:
return math.ceil(file_size / split_interval)
elif split_type == SplitType.TIME:
return math.ceil(duration / split_interval)
elif split_type == SplitType.FRAME:
seconds_cap = split_interval / sample_rate
return math.ceil(duration / seconds_cap)
else:
raise ValueError(f"Invalid split type: {split_type}")
def _get_path_metadata(self):
"""
Get the metadata for a path.
path: str, path to get the metadata for if None, get the metadata for all paths
"""
return self.path_metadata
[docs]
def load_data(queue: queue.Queue, paths: list[str], thread_stop: threading.Event):
file = None
logger.info(f"Loading data for {len(paths)} files")
try:
for file in paths:
if thread_stop.is_set():
return
with open(file, "rb") as f:
queue.put(f.read())
except Exception as e:
logging.error(f"Error processing file {file} type: {type(file)} {e}")
queue.put(RuntimeError(f"Error processing file {file}: {e}"))
finally:
queue.put(StopIteration)
[docs]
class DataLoader:
"""
DataLoader is a class that is used to load data from a list of paths and push it to a queue.
paths: list[str], list of paths to process
size: int, size of the queue
"""
def __init__(
self,
path: str,
output_dir: str,
split_type: SplitType = SplitType.SIZE,
split_interval: int = 450,
interface: LoaderInterface = None,
size: int = 2,
video_audio_separate: bool = False,
audio_only: bool = False,
):
interface = interface if interface else MediaInterface()
self.thread = None
self.thread_stop = threading.Event()
self.queue = queue.Queue(size)
self.path = Path(path)
self.output_dir = output_dir
self.split_interval = split_interval
self.interface = interface
self.files_completed = []
self.split_type = split_type
self.video_audio_separate = video_audio_separate
self.audio_only = audio_only
# process the file immediately on instantiation
self._process()
def _process(self):
files_completed = self.interface.split(
self.path,
self.output_dir,
split_interval=self.split_interval,
split_type=self.split_type,
video_audio_separate=self.video_audio_separate,
audio_only=self.audio_only,
)
# get durations for files in self.files_completed
durations = []
for file in files_completed:
_, _, duration = self.interface.probe_media(
Path(file), split_interval=self.split_interval, split_type=self.split_type
)
durations.append(duration)
self.files_completed = list(zip(files_completed, durations))
def __next__(self):
payload = self.queue.get()
if payload == StopIteration:
raise payload
else:
return payload
[docs]
def stop(self):
"""
Reset itertor by stopping the thread and clearing the queue.
"""
if self.thread:
self.thread_stop.set()
self.thread.join()
self.thread = None
try:
while True:
self.queue.get_nowait()
except Exception:
pass
finally:
self.thread_stop.clear()
def __iter__(self):
self.stop()
self.thread_stop.clear()
self.thread = threading.Thread(
target=load_data,
args=(
self.queue,
[file for file, _ in self.files_completed],
self.thread_stop,
),
daemon=True,
)
self.thread.start()
return self
def __len__(self):
return len(self.files_completed)
def __getitem__(self, index):
file_path = self.files_completed[index]
if isinstance(file_path, tuple):
file_path = file_path[0]
results = None
try:
if isinstance(file_path, tuple):
file_path = file_path[0]
with open(file_path, "rb") as f:
results = f.read()
return results
except Exception as e:
logging.error(f"Error getting item {index}: {e}")
raise e
def __del__(self):
self.stop()
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
[docs]
def get_metadata(self):
"""
Get the metadata for a path.
path: str, path to get the metadata for if None, get the metadata for all paths
"""
return self.interface._get_path_metadata()