Source code for nv_ingest.framework.util.telemetry.global_stats
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
from collections import defaultdict
from collections import deque
from statistics import mean
from statistics import median
TELEMETRY_WINDOW_SIZE = os.getenv("TELEMETRY_WINDOW_SIZE", 100)
[docs]
class GlobalStats:
"""
Singleton class for maintaining global and job-specific statistics within a pipeline.
This class is designed to keep track of various statistics, including the number of submitted and completed jobs,
as well as dynamic statistics (mean and median) for job-specific metrics over a sliding window.
Usage
-----
global_stats = GlobalStats.get_instance()
# Setting and incrementing global statistics
global_stats.set_stat("submitted_jobs", 5)
global_stats.increment_stat("completed_jobs")
# Appending job-specific statistics
global_stats.append_job_stat("job_1", 100)
global_stats.append_job_stat("job_1", 200)
# Retrieving statistics
submitted_jobs = global_stats.get_stat("submitted_jobs")
job_1_mean = global_stats.get_job_stat("job_1", "mean")
Methods
-------
get_instance():
Returns the singleton instance of the GlobalStats class.
reset_all_stats():
Resets all global and job-specific statistics.
set_stat(stat_name, value):
Sets a specific global statistic to the given value.
increment_stat(stat_name, value=1):
Increments a specific global statistic by the given value (default is 1).
append_job_stat(job_name, value):
Appends a value to the job-specific statistics and updates the mean and median for the job.
get_stat(stat_name):
Retrieves the value of a specific global statistic.
get_job_stat(job_name, stat_name):
Retrieves the value of a specific job-specific statistic (mean or median).
get_all_stats():
Returns a dictionary containing all global and job-specific statistics.
Attributes
----------
max_jobs : int
The maximum number of jobs to retain in the statistics window (default is 100).
stats : dict
Dictionary to hold global statistics.
job_stats : defaultdict
Dictionary to hold job-specific statistics, with each job having its own deque for values, mean, and median.
Example
-------
>>> global_stats = GlobalStats.get_instance()
>>> global_stats.increment_stat("submitted_jobs", 10)
>>> global_stats.append_job_stat("job_1", 50)
>>> global_stats.append_job_stat("job_1", 70)
>>> print(global_stats.get_stat("submitted_jobs"))
10
>>> print(global_stats.get_job_stat("job_1", "mean"))
60.0
>>> print(global_stats.get_job_stat("job_1", "median"))
60.0
"""
_instance = None
[docs]
@staticmethod
def get_instance():
if GlobalStats._instance is None:
GlobalStats()
return GlobalStats._instance
def __init__(self):
if GlobalStats._instance is not None:
raise Exception("This class is a singleton. Use `GlobalStats.get_instance()`.")
GlobalStats._instance = self
self.max_jobs = TELEMETRY_WINDOW_SIZE
self.reset_all_stats()
[docs]
def reset_all_stats(self):
self.stats = {
"submitted_jobs": 0,
"completed_jobs": 0,
"failed_jobs": 0,
}
self.job_stats = defaultdict(lambda: {"values": deque(), "mean": 0.0, "median": 0.0})
[docs]
def set_stat(self, stat_name, value):
self.stats[stat_name] = value
[docs]
def increment_stat(self, stat_name, value=1):
self.stats[stat_name] += value
[docs]
def append_job_stat(self, job_name, value):
job_stat = self.job_stats[job_name]
values = job_stat["values"]
if len(values) >= self.max_jobs:
values.popleft()
values.append(value)
job_stat["mean"] = mean(values)
job_stat["median"] = median(values)
[docs]
def get_stat(self, stat_name):
if stat_name not in self.stats:
raise ValueError(f"Key {stat_name} does not exist.")
return self.stats[stat_name]
[docs]
def get_job_stat(self, job_name, stat_name):
return self.job_stats[job_name][stat_name]
[docs]
def get_all_stats(self):
return {
"global_stats": self.stats.copy(),
"job_stats": {job_name: stats.copy() for job_name, stats in self.job_stats.items()},
}
def __str__(self):
return str(self.get_all_stats())