nat.llm.dynamo_llm#

Dynamo LLM provider with automatic nvext.agent_hints and nvnext.cache_control injection for KV cache optimization.

This module provides a specialized OpenAI-compatible LLM that sends Dynamo routing hints for optimal KV cache management and request routing. The hint parameters are optimizable via the NAT optimizer.

The implementation uses a custom httpx transport to inject hints at the HTTP level, making it framework-agnostic (works with LangChain, LlamaIndex, ADK).

Transport Mechanism#

All routing hints are injected into nvext.agent_hints (dict in the request body). The default Dynamo frontend passes this through to the preprocessed request, and our custom processor.py reads the routing fields directly from agent_hints.

Standard Dynamo fields (latency_sensitivity, osl, priority) are consumed by Dynamo’s built-in router and engine scheduler. Custom fields (prefix_id, total_requests, iat) are consumed by our custom processor.py.

nvext Hint Parameters#

nvext_prefix_osl (Output Sequence Length)

Expected output tokens for response length hinting. Raw integer value is always sent in nvext.agent_hints. Accepts categorical strings (LOW/MEDIUM/HIGH) for backward compatibility, which are converted to representative token counts (128/512/2048).

nvext_prefix_iat (Inter-Arrival Time)

Expected inter-arrival time in milliseconds. Raw integer value is always sent in nvext.agent_hints. Accepts categorical strings (LOW/MEDIUM/HIGH) for backward compatibility, which are converted to representative millisecond values (50/250/750).

nvext_prefix_total_requests

Expected requests per conversation:

  • Higher values increase KV cache affinity and worker stickiness

  • Lower values allow more load balancing

Attributes#

Classes#

CachePinType

Cache pinning strategy for KV cache entries.

CacheControlMode

Controls when nvext.cache_control is injected into requests.

DynamoPrefixContext

Singleton class for managing Dynamo prefix IDs across LLM calls.

DynamoModelConfig

A Dynamo LLM provider with automatic nvext.agent_hints and nvext.cache_control injection for KV cache optimization.

_DynamoTransport

Custom transport wrapper that injects all routing hints into nvext.agent_hints.

Functions#

create_httpx_client_with_dynamo_hooks(→ httpx.AsyncClient)

Create an httpx.AsyncClient with Dynamo hint injection via custom transport.

dynamo_llm(config, _builder)

Register the Dynamo LLM provider.

Module Contents#

logger#
_OSL_CATEGORY_TO_INT: dict[str, int]#
_IAT_CATEGORY_TO_INT: dict[str, int]#
_DEFAULT_LATENCY_SENSITIVITY: int = 2#
class CachePinType#

Bases: enum.StrEnum

Cache pinning strategy for KV cache entries.

Controls how aggressively the Dynamo KV cache retains entries for a prefix:

  • EPHEMERAL: Cache entries auto-expire after a computed TTL of inactivity. TTL is total_requests * iat (the estimated total conversation duration in milliseconds), giving the expected time span over which this prefix’s cache entries should be retained before eviction.

Initialize self. See help(type(self)) for accurate signature.

EPHEMERAL = 'ephemeral'#
class CacheControlMode#

Bases: enum.StrEnum

Controls when nvext.cache_control is injected into requests.

  • ALWAYS: Inject on every request (refreshes TTL each turn).

  • FIRST_ONLY: Inject only on the first request per prefix_id, pinning the system prompt when it is first established in the KV cache. Subsequent requests benefit from prefix matching without re-pinning the growing conversation context.

Initialize self. See help(type(self)) for accurate signature.

ALWAYS = 'always'#
FIRST_ONLY = 'first_only'#
class DynamoPrefixContext#

Singleton class for managing Dynamo prefix IDs across LLM calls.

Prefix IDs are unique per depth level in the function call stack, allowing different caching behavior at different levels of nested function calls. Each depth level gets its own prefix ID that remains constant within a single workflow run but changes between runs.

The prefix ID format is: {workflow_run_id}-d{depth}

Usage:

from nat.llm.dynamo_llm import DynamoPrefixContext

# Automatically gets prefix ID based on current call stack depth
prefix_id = DynamoPrefixContext.get()

# Or use as a context manager for explicit control
with DynamoPrefixContext.scope("eval-q001-abc123"):
    # All LLM calls here will use "eval-q001-abc123" prefix
    ...
_prefix_ids_by_depth: contextvars.ContextVar[dict[int, str] | None]#
_override_prefix_id: contextvars.ContextVar[str | None]#
classmethod _get_current_depth() int#

Get the current function call stack depth from Context.

classmethod _get_or_create_depth_map() dict[int, str]#

Get or create the depth -> prefix_id mapping for this context.

classmethod set(prefix_id: str) None#

Set an override prefix ID that takes precedence over depth-based IDs.

Use this when you need explicit control over the prefix ID, such as during batch evaluation where each question should have a specific ID.

Args:

prefix_id: The prefix ID to use (overrides depth-based generation)

classmethod clear() None#

Clear all prefix ID state (both override and depth-based).

classmethod get() str#

Get the Dynamo prefix ID for the current context.

Returns the override prefix ID if set, otherwise returns a depth-based prefix ID that is unique per workflow run and call stack depth.

Returns:

The prefix ID string, never None.

classmethod is_set() bool#

Check if a Dynamo prefix ID is available (always True, IDs are auto-generated).

classmethod scope(prefix_id: str) collections.abc.Iterator[None]#

Context manager for scoped override prefix ID usage.

Sets an override prefix ID on entry and restores the previous state on exit, ensuring proper cleanup even if exceptions occur. Supports nesting.

Args:

prefix_id: The override prefix ID for this scope

Yields:

None

Usage:
with DynamoPrefixContext.scope(“eval-q001”):

# All LLM calls here will use “eval-q001” prefix await llm.ainvoke(…)

class DynamoModelConfig(/, **data: Any)#

Bases: nat.llm.openai_llm.OpenAIModelConfig

A Dynamo LLM provider with automatic nvext.agent_hints and nvext.cache_control injection for KV cache optimization.

This is a specialized OpenAI-compatible LLM that sends Dynamo routing hints for optimal KV cache management and request routing. Hints are injected when enable_nvext_hints is True. The hint parameters (nvext_prefix_total_requests, nvext_prefix_osl, nvext_prefix_iat) are optimizable via the NAT optimizer.

All hints are sent via nvext.agent_hints in the request body. Standard Dynamo fields (latency_sensitivity, osl, priority) are consumed by Dynamo’s built-in router and engine scheduler. Custom fields (prefix_id, total_requests, iat) are consumed by the custom processor.py.

To disable hints, set enable_nvext_hints: false in your config (the default).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

enable_nvext_hints: bool = None#
nvext_prefix_id_template: str | None = None#
nvext_prefix_total_requests: int = None#
nvext_prefix_osl: int = None#
nvext_prefix_iat: int = None#
request_timeout: float = None#
nvext_prediction_trie_path: str | None = None#
nvext_cache_pin_type: CachePinType | None = None#
nvext_cache_control_mode: CacheControlMode = None#
nvext_max_sensitivity: int = None#
classmethod _coerce_nvext_prefix_osl(v: object) int#

Convert categorical OSL strings (LOW/MEDIUM/HIGH) to representative token counts.

classmethod _coerce_nvext_prefix_iat(v: object) int#

Convert categorical IAT strings (LOW/MEDIUM/HIGH) to representative millisecond values.

static get_dynamo_field_names() frozenset[str]#

Get the set of Dynamo-specific field names for model_dump exclusion.

Use this when building config dicts for framework clients to exclude Dynamo-specific parameters that should not be passed to the underlying client.

Returns:

A frozenset of Dynamo-specific field names.

Example:

config_dict = config.model_dump(
    exclude={"type", "thinking", *DynamoModelConfig.get_dynamo_field_names()},
    ...
)
class _DynamoTransport(
transport: httpx.AsyncBaseTransport,
total_requests: int,
osl: int,
iat: int,
prediction_lookup: PredictionTrieLookup | None = None,
cache_pin_type: CachePinType | None = CachePinType.EPHEMERAL,
cache_control_mode: CacheControlMode = CacheControlMode.ALWAYS,
max_sensitivity: int = 1000,
)#

Bases: httpx.AsyncBaseTransport

Custom transport wrapper that injects all routing hints into nvext.agent_hints.

This approach is more reliable than event hooks because it modifies the request BEFORE httpx’s internal state machine processes it.

All hints are placed in a single nvext.agent_hints dict:

  • Standard Dynamo fields (latency_sensitivity, osl, priority): consumed by Dynamo’s built-in router and engine scheduler.

  • Custom routing fields (prefix_id, total_requests, iat): consumed by the custom processor.py for Thompson Sampling worker selection.

_transport#
_total_requests#
_osl#
_iat#
_prediction_lookup = None#
_cache_pin_type#
_cache_control_mode#
_max_sensitivity = 1000#
_call_counts: dict[str, int]#
_call_counts_lock#
async handle_async_request(request: httpx.Request) httpx.Response#
async aclose() None#

Close the underlying transport.

create_httpx_client_with_dynamo_hooks(
total_requests: int,
osl: int,
iat: int,
timeout: float = 600.0,
prediction_lookup: PredictionTrieLookup | None = None,
cache_pin_type: CachePinType | None = CachePinType.EPHEMERAL,
cache_control_mode: CacheControlMode = CacheControlMode.ALWAYS,
max_sensitivity: int = 1000,
) httpx.AsyncClient#

Create an httpx.AsyncClient with Dynamo hint injection via custom transport.

This client can be passed to the OpenAI SDK or wrapped in an AsyncOpenAI client for use with LiteLLM/ADK. All hints are injected into nvext.agent_hints in the request body.

Args:

total_requests: Expected number of requests for this prefix osl: Expected output tokens (raw integer, always sent as int in agent_hints) iat: Expected inter-arrival time in ms (raw integer, always sent as int) timeout: HTTP request timeout in seconds prediction_lookup: Optional PredictionTrieLookup for dynamic hint injection cache_pin_type: Cache pinning strategy. When set, injects nvext.cache_control with TTL. Set to None to disable. cache_control_mode: When to inject cache_control: ‘always’ or ‘first_only’ per prefix. max_sensitivity: Maximum latency sensitivity for computing priority

Returns:

An httpx.AsyncClient configured with Dynamo hint injection.

async dynamo_llm(
config: DynamoModelConfig,
_builder: nat.builder.builder.Builder,
)#

Register the Dynamo LLM provider.