Source code for nv_ingest_client.util.dataset

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import json
import os
import random
from collections import Counter
from io import BytesIO
from io import StringIO
from pprint import pformat


[docs] def get_dataset_statistics(dataset_bytes: BytesIO) -> str: """ Reads a dataset specification from a BytesIO object, computes statistics about the dataset, and returns a formatted string. Parameters ---------- dataset_bytes : BytesIO The BytesIO object containing the dataset in JSON format. Returns ------- str A formatted string containing statistics about the dataset. """ try: dataset_bytes.seek(0) dataset = json.load(dataset_bytes) except json.JSONDecodeError: raise sampled_files = dataset.get("sampled_files", []) metadata = dataset.get("metadata", {}) # Compute statistics file_types = [os.path.splitext(file)[1][1:].lower() for file in sampled_files] file_type_counts = Counter(file_types) unique_files = set(sampled_files) unique_file_types = { file_type: len(set(f for f in sampled_files if f.endswith("." + file_type))) for file_type in file_type_counts } total_size_bytes = sum(os.path.getsize(f) for f in sampled_files) total_size_gb = total_size_bytes / (1024**3) file_type_sizes = { ftype: sum(os.path.getsize(f) for f in sampled_files if f.endswith("." + ftype)) for ftype in file_type_counts } file_type_sizes_gb = {ftype: size / (1024**3) for ftype, size in file_type_sizes.items()} estimated_sizes_gb = { ftype: metadata["file_type_proportions"][ftype]["target_proportion"] / 100 * total_size_gb for ftype in metadata["file_type_proportions"] } # Format statistics as a string stats_stringio = StringIO() stats = { "metadata": metadata, "total_number_of_files": len(sampled_files), "total_number_of_unique_files": len(unique_files), "total_number_of_files_per_file_type": file_type_counts, "total_number_of_unique_files_per_file_type": unique_file_types, "total_size_gb": total_size_gb, "total_size_per_file_type_gb": file_type_sizes_gb, "estimated_total_size_per_file_type_gb": estimated_sizes_gb, } stats_stringio.write("Dataset Statistics:\n") stats_stringio.write(pformat(stats)) return stats_stringio.getvalue()
[docs] def get_dataset_files(dataset_bytes: BytesIO, shuffle: bool = False) -> list: """ Extracts and optionally shuffles the list of files contained in a dataset. Parameters ---------- dataset_bytes : BytesIO The BytesIO object containing the dataset in JSON format. shuffle : bool, optional Whether to shuffle the list of files before returning. Defaults to False. Returns ------- list The list of files from the dataset, possibly shuffled. """ try: dataset_bytes.seek(0) dataset = json.load(dataset_bytes) sampled_files = dataset.get("sampled_files", []) if shuffle: random.shuffle(sampled_files) return sampled_files except json.JSONDecodeError as err: raise ValueError(f"{err}")