nat.plugins.eval.profiler.inference_optimization.dynamo_metrics#

Dynamo Metrics Collector for NAT Profiler.

This module collects performance metrics from the Dynamo inference stack via Prometheus. Metrics are collected from four Dynamo components:

  • Frontend (:8000): User-facing latency, throughput, token statistics

  • Worker (:8081): KV cache utilization, SGLang backend metrics

  • Router (:8082): Thompson Sampling routing decisions

  • Processor (:8083): Thompson Sampling KVE (KV Efficiency) metrics

Core Optimization Metrics#

The profiler focuses on three core metrics for Dynamo LLM optimization:

  1. KV Efficiency (KVE) - Token-agnostic measure of computational savings:

    • Formula: KVE = cached_tokens / prompt_tokens

    • Measures the fraction of total work saved via KV cache reuse

    • A KVE of 0.8 means 80% of prompt tokens were served from cache

    • Source: Thompson Sampling processor (dynamo_component_thompson_kve_*)

    • Fallback: SGLang native cache_hit_rate if KVE counters unavailable

    • Affected by: prefix_id routing, prefix hints (osl, iat), request patterns

  2. Time to First Token (TTFT) (ttft_p50, ttft_p95, ttft_p99):

    • Latency from request arrival to first token generation

    • Critical for user-perceived responsiveness

    • Affected by queue depth, worker selection, KV cache hits

  3. Inter-Token Latency (ITL) (itl_p50, itl_p95, itl_p99):

    • Time between consecutive token generations during streaming

    • Affects smoothness of streaming responses

    • Influenced by batch scheduling and GPU utilization

Adding New Metrics#

To add a new metric from any Dynamo endpoint:

  1. Find the metric name by curling the endpoint:

    curl -s http://localhost:8081/metrics | grep -i kv
    curl -s http://localhost:8000/metrics | grep -i token
    
  2. Add the Prometheus query to METRIC_QUERIES:

    METRIC_QUERIES = {
        ...
        "my_new_metric": "rate(dynamo_component_my_metric_total[{range}])",
    }
    

    Note: Use {range} placeholder for time range (replaced with config value).

  3. Add the field to DynamoMetricsResult:

    class DynamoMetricsResult(BaseModel):
        ...
        my_new_metric: float | None = Field(
            default=None,
            description="Description of my new metric"
        )
    
  4. Update the collector if needed (optional - for complex metrics):

    If the metric requires special handling (e.g., combining multiple queries), add custom logic in DynamoMetricsCollector.collect().

Metric Reference by Endpoint#

Frontend (:8000/metrics):

dynamo_frontend_requests_total          # Counter: Total requests
dynamo_frontend_inflight_requests       # Gauge: Current inflight
dynamo_frontend_time_to_first_token_seconds_bucket  # Histogram: TTFT
dynamo_frontend_inter_token_latency_seconds_bucket  # Histogram: ITL
dynamo_frontend_output_tokens_total     # Counter: Total output tokens

Worker (:8081/metrics):

dynamo_component_kvstats_gpu_cache_usage_percent    # Gauge: KV cache %
dynamo_component_kvstats_gpu_prefix_cache_hit_rate  # Gauge: Cache hit rate
sglang:cache_hit_rate                   # Gauge: SGLang native cache hit
sglang:gen_throughput                   # Gauge: Generation throughput
sglang:num_running_reqs                 # Gauge: Running requests
sglang:num_queue_reqs                   # Gauge: Queued requests

Router (:8082/metrics):

dynamo_component_requests_total{dynamo_endpoint="find_worker"}
dynamo_component_request_duration_seconds_bucket

Processor (:8083/metrics):

dynamo_component_thompson_requests_total
dynamo_component_thompson_kve_cached_tokens_total
dynamo_component_thompson_kve_prompt_tokens_total
dynamo_component_thompson_routing_decisions_total

See external/dynamo/monitoring/README.md for the complete metrics reference.

Attributes#

Classes#

DynamoCoreMetrics

Core optimization metrics for Dynamo LLM inference.

DynamoMetricsResult

Results from Dynamo metrics collection.

DynamoMetricsCollector

Collects Dynamo inference stack metrics from Prometheus.

Functions#

collect_dynamo_metrics(→ DynamoMetricsResult)

Convenience function to collect Dynamo metrics.

collect_core_metrics(→ DynamoCoreMetrics)

Convenience function to collect only the three core optimization metrics.

Module Contents#

logger#
METRIC_QUERIES: dict[str, str]#
class DynamoCoreMetrics(/, **data: Any)#

Bases: pydantic.BaseModel

Core optimization metrics for Dynamo LLM inference.

These three metrics are the primary targets for optimization:

  1. KV Efficiency (KVE): Fraction of computational work saved via KV cache. - Formula: cached_tokens / prompt_tokens - Target: Maximize (closer to 1.0 = more work saved) - Affected by: prefix_id routing, prefix hints (osl, iat), request patterns - Token-agnostic measure of actual computational savings

  2. TTFT (Time to First Token): User-perceived initial latency. - Target: Minimize (lower is better) - Affected by: queue depth, worker selection, KV cache hits

  3. ITL (Inter-Token Latency): Streaming smoothness. - Target: Minimize (lower is better) - Affected by: batch scheduling, GPU utilization, memory bandwidth

Usage:

result = await collector.collect()
core = result.get_core_metrics()

print(f"KV Efficiency: {core.kv_efficiency:.2%}")
print(f"TTFT P95: {core.ttft_p95_seconds:.3f}s")
print(f"ITL P95: {core.itl_p95_seconds:.3f}s")

# Check if all core metrics are available
if core.is_complete():
    print("All core metrics collected successfully")

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

kv_efficiency: float | None = None#
kv_efficiency_fallback: float | None = None#
ttft_p50_seconds: float | None = None#
ttft_p95_seconds: float | None = None#
ttft_p99_seconds: float | None = None#
itl_p50_seconds: float | None = None#
itl_p95_seconds: float | None = None#
itl_p99_seconds: float | None = None#
get_effective_kv_efficiency() float | None#

Get the best available KV efficiency value.

Prefers the true KVE (cached_tokens/prompt_tokens) from Thompson Sampling, falls back to SGLang native cache_hit_rate if KVE is unavailable.

Returns:

KV efficiency (0-1) or None if neither source is available

is_complete() bool#

Check if all core optimization metrics were successfully collected.

Returns:

True if KV efficiency (or fallback), ttft_p95, and itl_p95 are all available

get_optimization_summary() dict[str, float | None]#

Get a summary dict of the primary optimization targets.

Returns:

Dict with the three key metrics for optimization loops

to_optimization_score(
kv_weight: float = 0.4,
ttft_weight: float = 0.4,
itl_weight: float = 0.2,
ttft_target_seconds: float = 0.5,
itl_target_seconds: float = 0.05,
) float | None#

Compute a combined optimization score (higher is better).

This provides a single scalar for optimization algorithms that combines the three core metrics with configurable weights.

Args:

kv_weight: Weight for KV efficiency (0-1) ttft_weight: Weight for TTFT score (0-1) itl_weight: Weight for ITL score (0-1) ttft_target_seconds: Target TTFT for scoring (score=1.0 at target) itl_target_seconds: Target ITL for scoring (score=1.0 at target)

Returns:

Combined score (0-1) where higher is better, or None if metrics unavailable

Note:

Weights should sum to 1.0. TTFT and ITL scores are computed as target/actual (capped at 1.0) so lower latency = higher score.

class DynamoMetricsResult(/, **data: Any)#

Bases: pydantic.BaseModel

Results from Dynamo metrics collection.

To add a new metric: 1. Add a field here with appropriate type and description 2. Add the corresponding Prometheus query to METRIC_QUERIES above 3. The collector will automatically populate it

All metrics are optional (None) to handle cases where: - The metric endpoint is unavailable - Prometheus query returns no data - The Dynamo component is not running

For optimization, use get_core_metrics() to extract the three primary optimization targets (KV Cache Efficiency, TTFT, ITL).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

kve_cached_tokens_rate: float | None = None#
kve_prompt_tokens_rate: float | None = None#
kve_device_blocks_rate: float | None = None#
kve_host_blocks_rate: float | None = None#
kve_disk_blocks_rate: float | None = None#
kv_cache_usage_percent: float | None = None#
kv_cache_hit_rate_sglang: float | None = None#
kv_cache_hit_rate_dynamo: float | None = None#
ttft_p50: float | None = None#
ttft_p95: float | None = None#
ttft_p99: float | None = None#
itl_p50: float | None = None#
itl_p95: float | None = None#
itl_p99: float | None = None#
inflight_requests_frontend: float | None = None#
inflight_requests_worker: float | None = None#
queued_requests: float | None = None#
requests_per_minute: float | None = None#
token_throughput: float | None = None#
sglang_running_requests: float | None = None#
sglang_queue_depth: float | None = None#
sglang_gen_throughput: float | None = None#
sglang_utilization: float | None = None#
thompson_routing_decisions_rate: float | None = None#
thompson_requests_rate: float | None = None#
collection_timestamp: float | None = None#
prometheus_url: str | None = None#
errors: list[str] = None#
compute_kv_efficiency() float | None#

Compute KV Efficiency (KVE) from Thompson Sampling processor metrics.

KVE = cached_tokens / prompt_tokens

This measures the fraction of computational work saved via KV cache reuse. A KVE of 0.8 means 80% of prompt tokens were served from cache.

Returns:

KVE (0-1) if both metrics are available and prompt_tokens > 0, else None

get_core_metrics() DynamoCoreMetrics#

Extract the three core optimization metrics.

KV Efficiency is computed as cached_tokens / prompt_tokens from the Thompson Sampling processor. Falls back to SGLang native cache_hit_rate if KVE counters are unavailable.

Returns:

DynamoCoreMetrics with KV efficiency, TTFT, and ITL

Usage:

result = await collector.collect()
core = result.get_core_metrics()

if core.is_complete():
    score = core.to_optimization_score()
    print(f"Optimization score: {score:.3f}")
has_core_metrics() bool#

Check if all three core optimization metrics are available.

Returns:

True if kv_cache_hit_rate, ttft_p95, and itl_p95 are all collected

class DynamoMetricsCollector(
config: nat.data_models.profiler.DynamoMetricsConfig,
)#

Collects Dynamo inference stack metrics from Prometheus.

Usage:

from nat.plugins.eval.profiler.inference_optimization.dynamo_metrics import DynamoMetricsCollector
from nat.data_models.profiler import DynamoMetricsConfig

config = DynamoMetricsConfig(enable=True, prometheus_url="http://localhost:9090")
collector = DynamoMetricsCollector(config)
result = await collector.collect()

print(f"TTFT P95: {result.ttft_p95}")
print(f"KV Cache Usage: {result.kv_cache_usage_percent}%")

Initialize the collector with configuration.

Args:

config: DynamoMetricsConfig with Prometheus URL and metric toggles

config#
prometheus_url#
async collect() DynamoMetricsResult#

Collect all enabled Dynamo metrics from Prometheus.

Returns:

DynamoMetricsResult with collected metric values

_get_enabled_metrics() dict[str, str]#

Get the subset of METRIC_QUERIES enabled by config.

Returns:

Dict mapping metric names to their Prometheus queries

async _query_prometheus(
client: httpx.AsyncClient,
query: str,
) float | None#

Execute a Prometheus query and extract the scalar result.

First attempts an instant query. If no data is returned (e.g., because rate() returns 0 after workflow completion), falls back to a range query with historical lookback to capture the most recent non-zero value.

Args:

client: httpx AsyncClient query: PromQL query string

Returns:

Float value if successful, None if no data or error

async _query_prometheus_instant(
client: httpx.AsyncClient,
query: str,
) float | None#

Execute a Prometheus instant query.

Args:

client: httpx AsyncClient query: PromQL query string

Returns:

Float value if successful, None if no data or error

async _query_prometheus_range(
client: httpx.AsyncClient,
query: str,
) float | None#

Execute a Prometheus range query with historical lookback.

This captures metrics that were recorded during the workflow execution but are no longer updating (rate() would return 0 for instant queries).

The time window is determined by: 1. If workflow timestamps are set: query from workflow start to workflow end (isolated to this eval) 2. If lookback_seconds is set: query that many seconds back from now 3. Otherwise: default to 10 minutes (600 seconds)

Args:

client: httpx AsyncClient query: PromQL query string

Returns:

The most recent non-NaN, non-zero value if found, None otherwise

async health_check() dict[str, Any]#

Check connectivity to Prometheus and Dynamo endpoints.

Returns:

Dict with health status for each component

async collect_dynamo_metrics(
config: nat.data_models.profiler.DynamoMetricsConfig,
) DynamoMetricsResult#

Convenience function to collect Dynamo metrics.

Args:

config: DynamoMetricsConfig with collection settings

Returns:

DynamoMetricsResult with collected metrics

async collect_core_metrics(
prometheus_url: str = 'http://localhost:9090',
query_range: str = '30s',
) DynamoCoreMetrics#

Convenience function to collect only the three core optimization metrics.

This is a simplified interface for optimization loops that only need: - KV Cache Efficiency - Time to First Token (TTFT) - Inter-Token Latency (ITL)

Args:

prometheus_url: Prometheus server URL query_range: Time range for rate calculations (e.g., ‘1m’, ‘5m’)

Returns:

DynamoCoreMetrics with the three core metrics

Usage:

from nat.plugins.eval.profiler.inference_optimization.dynamo_metrics import collect_core_metrics

# Quick collection for optimization
core = await collect_core_metrics()

if core.is_complete():
    print(f"KV Efficiency: {core.kv_cache_efficiency:.2%}")
    print(f"TTFT P95: {core.ttft_p95_seconds:.3f}s")
    print(f"ITL P95: {core.itl_p95_seconds:.3f}s")

    # Get combined optimization score
    score = core.to_optimization_score()
    print(f"Combined score: {score:.3f}")