nat.plugins.eval.profiler.inference_optimization.dynamo_metrics#
Dynamo Metrics Collector for NAT Profiler.
This module collects performance metrics from the Dynamo inference stack via Prometheus. Metrics are collected from four Dynamo components:
Frontend (:8000): User-facing latency, throughput, token statistics
Worker (:8081): KV cache utilization, SGLang backend metrics
Router (:8082): Thompson Sampling routing decisions
Processor (:8083): Thompson Sampling KVE (KV Efficiency) metrics
Core Optimization Metrics#
The profiler focuses on three core metrics for Dynamo LLM optimization:
KV Efficiency (KVE) - Token-agnostic measure of computational savings:
Formula:
KVE = cached_tokens / prompt_tokensMeasures the fraction of total work saved via KV cache reuse
A KVE of 0.8 means 80% of prompt tokens were served from cache
Source: Thompson Sampling processor (
dynamo_component_thompson_kve_*)Fallback: SGLang native
cache_hit_rateif KVE counters unavailableAffected by: prefix_id routing, prefix hints (osl, iat), request patterns
Time to First Token (TTFT) (
ttft_p50,ttft_p95,ttft_p99):Latency from request arrival to first token generation
Critical for user-perceived responsiveness
Affected by queue depth, worker selection, KV cache hits
Inter-Token Latency (ITL) (
itl_p50,itl_p95,itl_p99):Time between consecutive token generations during streaming
Affects smoothness of streaming responses
Influenced by batch scheduling and GPU utilization
Adding New Metrics#
To add a new metric from any Dynamo endpoint:
Find the metric name by curling the endpoint:
curl -s http://localhost:8081/metrics | grep -i kv curl -s http://localhost:8000/metrics | grep -i token
Add the Prometheus query to
METRIC_QUERIES:METRIC_QUERIES = { ... "my_new_metric": "rate(dynamo_component_my_metric_total[{range}])", }
Note: Use
{range}placeholder for time range (replaced with config value).Add the field to
DynamoMetricsResult:class DynamoMetricsResult(BaseModel): ... my_new_metric: float | None = Field( default=None, description="Description of my new metric" )
Update the collector if needed (optional - for complex metrics):
If the metric requires special handling (e.g., combining multiple queries), add custom logic in
DynamoMetricsCollector.collect().
Metric Reference by Endpoint#
Frontend (:8000/metrics):
dynamo_frontend_requests_total # Counter: Total requests
dynamo_frontend_inflight_requests # Gauge: Current inflight
dynamo_frontend_time_to_first_token_seconds_bucket # Histogram: TTFT
dynamo_frontend_inter_token_latency_seconds_bucket # Histogram: ITL
dynamo_frontend_output_tokens_total # Counter: Total output tokens
Worker (:8081/metrics):
dynamo_component_kvstats_gpu_cache_usage_percent # Gauge: KV cache %
dynamo_component_kvstats_gpu_prefix_cache_hit_rate # Gauge: Cache hit rate
sglang:cache_hit_rate # Gauge: SGLang native cache hit
sglang:gen_throughput # Gauge: Generation throughput
sglang:num_running_reqs # Gauge: Running requests
sglang:num_queue_reqs # Gauge: Queued requests
Router (:8082/metrics):
dynamo_component_requests_total{dynamo_endpoint="find_worker"}
dynamo_component_request_duration_seconds_bucket
Processor (:8083/metrics):
dynamo_component_thompson_requests_total
dynamo_component_thompson_kve_cached_tokens_total
dynamo_component_thompson_kve_prompt_tokens_total
dynamo_component_thompson_routing_decisions_total
See external/dynamo/monitoring/README.md for the complete metrics reference.
Attributes#
Classes#
Core optimization metrics for Dynamo LLM inference. |
|
Results from Dynamo metrics collection. |
|
Collects Dynamo inference stack metrics from Prometheus. |
Functions#
|
Convenience function to collect Dynamo metrics. |
|
Convenience function to collect only the three core optimization metrics. |
Module Contents#
- logger#
- class DynamoCoreMetrics(/, **data: Any)#
Bases:
pydantic.BaseModelCore optimization metrics for Dynamo LLM inference.
These three metrics are the primary targets for optimization:
KV Efficiency (KVE): Fraction of computational work saved via KV cache. - Formula:
cached_tokens / prompt_tokens- Target: Maximize (closer to 1.0 = more work saved) - Affected by: prefix_id routing, prefix hints (osl, iat), request patterns - Token-agnostic measure of actual computational savingsTTFT (Time to First Token): User-perceived initial latency. - Target: Minimize (lower is better) - Affected by: queue depth, worker selection, KV cache hits
ITL (Inter-Token Latency): Streaming smoothness. - Target: Minimize (lower is better) - Affected by: batch scheduling, GPU utilization, memory bandwidth
Usage:
result = await collector.collect() core = result.get_core_metrics() print(f"KV Efficiency: {core.kv_efficiency:.2%}") print(f"TTFT P95: {core.ttft_p95_seconds:.3f}s") print(f"ITL P95: {core.itl_p95_seconds:.3f}s") # Check if all core metrics are available if core.is_complete(): print("All core metrics collected successfully")
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- get_effective_kv_efficiency() float | None#
Get the best available KV efficiency value.
Prefers the true KVE (cached_tokens/prompt_tokens) from Thompson Sampling, falls back to SGLang native cache_hit_rate if KVE is unavailable.
- Returns:
KV efficiency (0-1) or None if neither source is available
- is_complete() bool#
Check if all core optimization metrics were successfully collected.
- Returns:
True if KV efficiency (or fallback), ttft_p95, and itl_p95 are all available
- get_optimization_summary() dict[str, float | None]#
Get a summary dict of the primary optimization targets.
- Returns:
Dict with the three key metrics for optimization loops
- to_optimization_score(
- kv_weight: float = 0.4,
- ttft_weight: float = 0.4,
- itl_weight: float = 0.2,
- ttft_target_seconds: float = 0.5,
- itl_target_seconds: float = 0.05,
Compute a combined optimization score (higher is better).
This provides a single scalar for optimization algorithms that combines the three core metrics with configurable weights.
- Args:
kv_weight: Weight for KV efficiency (0-1) ttft_weight: Weight for TTFT score (0-1) itl_weight: Weight for ITL score (0-1) ttft_target_seconds: Target TTFT for scoring (score=1.0 at target) itl_target_seconds: Target ITL for scoring (score=1.0 at target)
- Returns:
Combined score (0-1) where higher is better, or None if metrics unavailable
- Note:
Weights should sum to 1.0. TTFT and ITL scores are computed as target/actual (capped at 1.0) so lower latency = higher score.
- class DynamoMetricsResult(/, **data: Any)#
Bases:
pydantic.BaseModelResults from Dynamo metrics collection.
To add a new metric: 1. Add a field here with appropriate type and description 2. Add the corresponding Prometheus query to METRIC_QUERIES above 3. The collector will automatically populate it
All metrics are optional (None) to handle cases where: - The metric endpoint is unavailable - Prometheus query returns no data - The Dynamo component is not running
For optimization, use
get_core_metrics()to extract the three primary optimization targets (KV Cache Efficiency, TTFT, ITL).Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- compute_kv_efficiency() float | None#
Compute KV Efficiency (KVE) from Thompson Sampling processor metrics.
KVE = cached_tokens / prompt_tokens
This measures the fraction of computational work saved via KV cache reuse. A KVE of 0.8 means 80% of prompt tokens were served from cache.
- Returns:
KVE (0-1) if both metrics are available and prompt_tokens > 0, else None
- get_core_metrics() DynamoCoreMetrics#
Extract the three core optimization metrics.
KV Efficiency is computed as cached_tokens / prompt_tokens from the Thompson Sampling processor. Falls back to SGLang native cache_hit_rate if KVE counters are unavailable.
- Returns:
DynamoCoreMetrics with KV efficiency, TTFT, and ITL
Usage:
result = await collector.collect() core = result.get_core_metrics() if core.is_complete(): score = core.to_optimization_score() print(f"Optimization score: {score:.3f}")
- class DynamoMetricsCollector( )#
Collects Dynamo inference stack metrics from Prometheus.
Usage:
from nat.plugins.eval.profiler.inference_optimization.dynamo_metrics import DynamoMetricsCollector from nat.data_models.profiler import DynamoMetricsConfig config = DynamoMetricsConfig(enable=True, prometheus_url="http://localhost:9090") collector = DynamoMetricsCollector(config) result = await collector.collect() print(f"TTFT P95: {result.ttft_p95}") print(f"KV Cache Usage: {result.kv_cache_usage_percent}%")
Initialize the collector with configuration.
- Args:
config: DynamoMetricsConfig with Prometheus URL and metric toggles
- config#
- prometheus_url#
- async collect() DynamoMetricsResult#
Collect all enabled Dynamo metrics from Prometheus.
- Returns:
DynamoMetricsResult with collected metric values
- _get_enabled_metrics() dict[str, str]#
Get the subset of METRIC_QUERIES enabled by config.
- Returns:
Dict mapping metric names to their Prometheus queries
- async _query_prometheus(
- client: httpx.AsyncClient,
- query: str,
Execute a Prometheus query and extract the scalar result.
First attempts an instant query. If no data is returned (e.g., because rate() returns 0 after workflow completion), falls back to a range query with historical lookback to capture the most recent non-zero value.
- Args:
client: httpx AsyncClient query: PromQL query string
- Returns:
Float value if successful, None if no data or error
- async _query_prometheus_instant(
- client: httpx.AsyncClient,
- query: str,
Execute a Prometheus instant query.
- Args:
client: httpx AsyncClient query: PromQL query string
- Returns:
Float value if successful, None if no data or error
- async _query_prometheus_range(
- client: httpx.AsyncClient,
- query: str,
Execute a Prometheus range query with historical lookback.
This captures metrics that were recorded during the workflow execution but are no longer updating (rate() would return 0 for instant queries).
The time window is determined by: 1. If workflow timestamps are set: query from workflow start to workflow end (isolated to this eval) 2. If lookback_seconds is set: query that many seconds back from now 3. Otherwise: default to 10 minutes (600 seconds)
- Args:
client: httpx AsyncClient query: PromQL query string
- Returns:
The most recent non-NaN, non-zero value if found, None otherwise
- async collect_dynamo_metrics( ) DynamoMetricsResult#
Convenience function to collect Dynamo metrics.
- Args:
config: DynamoMetricsConfig with collection settings
- Returns:
DynamoMetricsResult with collected metrics
- async collect_core_metrics( ) DynamoCoreMetrics#
Convenience function to collect only the three core optimization metrics.
This is a simplified interface for optimization loops that only need: - KV Cache Efficiency - Time to First Token (TTFT) - Inter-Token Latency (ITL)
- Args:
prometheus_url: Prometheus server URL query_range: Time range for rate calculations (e.g., ‘1m’, ‘5m’)
- Returns:
DynamoCoreMetrics with the three core metrics
Usage:
from nat.plugins.eval.profiler.inference_optimization.dynamo_metrics import collect_core_metrics # Quick collection for optimization core = await collect_core_metrics() if core.is_complete(): print(f"KV Efficiency: {core.kv_cache_efficiency:.2%}") print(f"TTFT P95: {core.ttft_p95_seconds:.3f}s") print(f"ITL P95: {core.itl_p95_seconds:.3f}s") # Get combined optimization score score = core.to_optimization_score() print(f"Combined score: {score:.3f}")