nat.experimental.relay_telemetry_bridge#

Bridge NeMo Relay ATOF events into the NAT intermediate-step stream.

NeMo Relay emits Agent Trajectory Observability Format (ATOF) lifecycle events. The NVIDIA NeMo Agent Toolkit telemetry pipeline consumes IntermediateStep objects. This module maps ATOF scope and mark events to IntermediateStep so Relay-instrumented subprocesses can be folded into the active NAT trace.

Attributes#

Functions#

load_atof_jsonl(→ list[RelayEvent])

Load Relay ATOF events from a JSONL file.

relay_events_to_intermediate_steps(...)

Convert Relay ATOF events to NAT intermediate steps.

inject_atof_jsonl(...)

Load Relay ATOF JSONL and inject it into the current NAT stream.

_scope_event_to_step(...)

_mark_event_to_steps(...)

_event_types_for_category(...)

_event_uuid(→ str)

_parent_id(→ str)

_event_name(→ str)

_timestamp_seconds(→ float)

_trace_metadata(...)

_usage_info(...)

_find_token_usage(→ dict[str, Any] | None)

_first_int(→ int)

Module Contents#

RelayEvent#
_CATEGORY_TO_EVENT_TYPES: dict[str, tuple[nat.data_models.intermediate_step.IntermediateStepType, nat.data_models.intermediate_step.IntermediateStepType]]#
load_atof_jsonl(path: str | pathlib.Path) list[RelayEvent]#

Load Relay ATOF events from a JSONL file.

relay_events_to_intermediate_steps(
events: collections.abc.Iterable[RelayEvent],
*,
root_parent_id: str = 'root',
function_ancestry: nat.data_models.invocation_node.InvocationNode | None = None,
) list[nat.data_models.intermediate_step.IntermediateStep]#

Convert Relay ATOF events to NAT intermediate steps.

Parameters#

events:

Relay ATOF events, usually loaded from an ATOF JSONL exporter.

root_parent_id:

NAT span id to use when a Relay event has no parent_uuid. Use the current NAT active span id to nest Relay telemetry under the adapter call that launched Relay.

function_ancestry:

NAT invocation node to attach to emitted steps. Defaults to a synthetic Relay bridge node.

inject_atof_jsonl(
path: str | pathlib.Path,
*,
context: nat.builder.context.Context | None = None,
) list[nat.data_models.intermediate_step.IntermediateStep]#

Load Relay ATOF JSONL and inject it into the current NAT stream.

_scope_event_to_step(
event: RelayEvent,
root_parent_id: str,
function_ancestry: nat.data_models.invocation_node.InvocationNode,
start_timestamps: dict[str, float],
event_uuids: set[str],
) nat.data_models.intermediate_step.IntermediateStep | None#
_mark_event_to_steps(
event: RelayEvent,
root_parent_id: str,
function_ancestry: nat.data_models.invocation_node.InvocationNode,
event_uuids: set[str],
) list[nat.data_models.intermediate_step.IntermediateStep]#
_event_types_for_category(
category: str,
) tuple[nat.data_models.intermediate_step.IntermediateStepType, nat.data_models.intermediate_step.IntermediateStepType]#
_event_uuid(event: RelayEvent) str#
_parent_id(
event: RelayEvent,
root_parent_id: str,
event_uuids: set[str],
) str#
_event_name(event: RelayEvent) str#
_timestamp_seconds(value: Any) float#
_trace_metadata(
event: RelayEvent,
) nat.data_models.intermediate_step.TraceMetadata#
_usage_info(
event: RelayEvent,
) nat.data_models.intermediate_step.UsageInfo | None#
_find_token_usage(value: Any) dict[str, Any] | None#
_first_int(values: dict[str, Any], *keys: str) int#