nat.utils.telemetry#

Opt-out runtime telemetry for the NAT CLI.

See nat.utils.telemetry.config for environment variables (defaults: opt-out disabled, telemetry enabled). To turn telemetry off for a single invocation:

NAT_TELEMETRY_ENABLED=false nat run …

To inspect events locally without making network calls:

NAT_TELEMETRY_ENDPOINT=stdout nat run …

Submodules#

Attributes#

TELEMETRY_ENABLED

Master opt-out flag. Initialized at import; may be flipped by the

Classes#

ConsentState

Enum where members are also (and must be) strings

CliCommandEvent

Single invocation of a top-level NAT CLI command (e.g. nat run).

NemoSourceEnum

The NeMo product that emitted the event. Discriminator across NeMo products

TaskStatusEnum

Outcome of the task being reported.

TelemetryEvent

Base class for all NAT telemetry events.

NATTelemetryHandler

Batches, flushes, and retries NAT telemetry events.

Functions#

maybe_prompt_for_consent(→ None)

Run the first-run consent prompt if needed.

read_persisted_consent(→ ConsentState)

Read the user's persisted consent decision, if any.

write_persisted_consent(→ None)

Persist the user's consent decision.

Package Contents#

TELEMETRY_ENABLED: bool#

Master opt-out flag. Initialized at import; may be flipped by the first-run consent prompt during a CLI invocation. Consumers should access this attribute on the module rather than from import TELEMETRY_ENABLED so the live value is honored.

class ConsentState#

Bases: enum.StrEnum

Enum where members are also (and must be) strings

Initialize self. See help(type(self)) for accurate signature.

ENABLED = 'enabled'#
DISABLED = 'disabled'#
NEVER_ASKED = 'never_asked'#

Run the first-run consent prompt if needed.

Called by the CLI entrypoint group callback. No-op when:

  • NAT_TELEMETRY_ENABLED env var is set (the user opted via env).

  • A persisted consent decision already exists.

  • The session is non-interactive — see is_interactive_session(), which requires stdin, stdout, and stderr to all be TTYs.

Otherwise: print the prompt, read the user’s answer, persist the decision, and update the live TELEMETRY_ENABLED flag so the rest of this same invocation honors the choice.

Read the user’s persisted consent decision, if any.

Asymmetric prompt_version gating, designed around user trust:

  • Current ``prompt_version``: return the persisted state as-is.

  • Stale or missing ``prompt_version`` with ``consent = “disabled”``: return DISABLED. A user who explicitly opted out under any version of the prompt must remain opted out — we never silently re-enable telemetry for someone who said no, even if we materially change the disclosure.

  • Stale or missing ``prompt_version`` with ``consent = “enabled”``: return NEVER_ASKED to force a re-prompt under the new disclosure. A stale “yes” from a previous prompt version should not silently authorize collection under a new (potentially broader) disclosure.

  • File missing, malformed, or unrecognized consent value: return NEVER_ASKED.

The asymmetry is the key: re-prompting an already-disabled user combined with the default-yes prompt would be a silent opt-in flip — the worst possible privacy regression. Re-prompting an already-enabled user is conservative and respects the new disclosure.

Persist the user’s consent decision.

Writes a small TOML file at the resolved consent path. Silently ignores write failures (filesystem permission errors, full disk, etc.) — the next interactive run will simply re-prompt.

class CliCommandEvent(/, **data: Any)#

Bases: TelemetryEvent

Single invocation of a top-level NAT CLI command (e.g. nat run).

Privacy: this schema is deliberately minimal. It must never carry command arguments, option values, file paths, config contents, workflow/function names, hostnames, usernames, or any other user-supplied strings. The only free-form string is error_class, which is the exception class name on failure (never the message).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

_event_name: ClassVar[str] = 'nat_cli_command'#
nemo_source: NemoSourceEnum = None#
command: str = None#
subcommand: str = None#
task_status: TaskStatusEnum = None#
duration_ms: int = None#
exit_code: int = None#
error_class: str = None#
python_version: str = None#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class NemoSourceEnum#

Bases: enum.StrEnum

The NeMo product that emitted the event. Discriminator across NeMo products sharing the NeMo Usage Telemetry schema.

Values mirror the schema-1.5 NemoSourceEnum definition in nemo-telemetry/schemas/anonymous_events.json. NAT itself only emits AGENT_TOOLKIT; the other values exist so this enum is a faithful mirror of the published schema (e.g. tests can deserialize foreign payloads, and future upstream additions surface as diff conflicts here).

Initialize self. See help(type(self)) for accurate signature.

INFERENCE = 'inference'#
AUDITOR = 'auditor'#
DATADESIGNER = 'datadesigner'#
EVALUATOR = 'evaluator'#
GUARDRAILS = 'guardrails'#
SAFE_SYNTHESIZER = 'safe-synthesizer'#
ANONYMIZER = 'anonymizer'#
AGENT_TOOLKIT = 'agent_toolkit'#
UNDEFINED = 'undefined'#
class TaskStatusEnum#

Bases: enum.StrEnum

Outcome of the task being reported.

Values mirror the schema-1.5 TaskStatusEnum definition. NAT’s CliCommandEvent only emits SUCCESS / FAILURE / INTERRUPTED; the other values exist for schema-mirror parity with other NeMo products.

Initialize self. See help(type(self)) for accurate signature.

SUCCESS = 'success'#
FAILURE = 'failure'#
COMPLETED = 'completed'#
ERROR = 'error'#
CANCELED = 'canceled'#
INTERRUPTED = 'interrupted'#
UNDEFINED = 'undefined'#
class TelemetryEvent(/, **data: Any)#

Bases: pydantic.BaseModel

Base class for all NAT telemetry events.

Subclasses must set _event_name as a ClassVar. Attempting to define a subclass without it raises TypeError at class-creation time.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

_event_name: ClassVar[str]#
_schema_version: ClassVar[str] = '1.5'#
class NATTelemetryHandler(
flush_interval_seconds: float = DEFAULT_FLUSH_INTERVAL_SECONDS,
max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE,
max_retries: int = DEFAULT_MAX_RETRIES,
request_timeout_seconds: float = DEFAULT_REQUEST_TIMEOUT_SECONDS,
source_client_version: str = CLIENT_VERSION,
session_id: str = 'undefined',
)#

Batches, flushes, and retries NAT telemetry events.

The handler is a no-op when the global TELEMETRY_ENABLED flag is false: enqueue() drops every event immediately and the timer loop has nothing to send. Lifecycle methods remain safe to call regardless.

Parameters#

flush_interval_seconds:

Periodic flush cadence used by the background timer loop.

max_queue_size:

When the in-memory queue reaches this size, an early flush is triggered.

max_retries:

Maximum re-send attempts per event before it is dropped.

request_timeout_seconds:

Per-request HTTP timeout. Bounds telemetry-induced latency.

source_client_version:

Reported as clientVer in the wire envelope. Defaults to the installed nvidia-nat-core version.

session_id:

Identifier used to group related events. NAT_SESSION_PREFIX is prepended if set.

_flush_interval = 120.0#
_max_queue_size = 50#
_max_retries = 3#
_request_timeout = 2.0#
_source_client_version#
_session_id#
_events: list[nat.utils.telemetry.payload.QueuedEvent] = []#
_dlq: list[nat.utils.telemetry.payload.QueuedEvent] = []#
_flush_signal#
_timer_task: asyncio.Task | None = None#
_running = False#
enqueue(event: nat.utils.telemetry.events.TelemetryEvent) None#

Queue an event for the next flush. Silently no-ops when disabled.

Reads config.TELEMETRY_ENABLED live (not via cached import) so the first-run consent prompt’s late update to the flag is honored.

async astart() None#
async astop() None#
async aflush() None#
start() None#
stop() None#
flush() None#
_run_sync(coro: Any) Any#

Run an async coroutine from sync code, even if a loop is running.

async _timer_loop() None#
async _flush_events() None#
async _send_events(
events: list[nat.utils.telemetry.payload.QueuedEvent],
) None#
async _send_with_client(
client: httpx.AsyncClient,
events: list[nat.utils.telemetry.payload.QueuedEvent],
payload: dict[str, Any],
) None#
_add_to_dlq(
events: list[nat.utils.telemetry.payload.QueuedEvent],
) None#