nat.plugins.a365.telemetry.register#

Attributes#

Classes#

TokenExtractor

Callable that extracts a bearer token from NAT's AuthResult.

_AgentTokenCache

Thread-safe token cache keyed by (agent_id, tenant_id).

A365TelemetryExporter

A telemetry exporter to transmit traces to Microsoft Agent 365 backend.

Functions#

_default_token_extractor(→ str | None)

Default extractor: BearerTokenCred or HeaderCred(Authorization).

register_token_extractor(→ None)

Register a custom token extractor for A365 telemetry.

_get_token_extractor(...)

_raise_no_bearer_token(→ NoReturn)

Raise A365AuthenticationError with a clear message when no token could be extracted.

a365_telemetry_exporter(config, builder)

Create an Agent 365 telemetry exporter.

Module Contents#

logger#
_TOKEN_EXTRACTOR_SUPPORTED = 'BearerTokenCred or HeaderCred(Authorization)'#
class TokenExtractor#

Bases: Protocol

Callable that extracts a bearer token from NAT’s AuthResult.

Used when the default (BearerTokenCred or HeaderCred(Authorization)) does not match your auth provider’s credential shape. Register a custom extractor with register_token_extractor(name, callable) and set token_extractor=name in config.

_default_token_extractor(
auth_result: nat.data_models.authentication.AuthResult,
) str | None#

Default extractor: BearerTokenCred or HeaderCred(Authorization).

Returns the bearer token string, or None if neither credential type is present. Caller should raise A365AuthenticationError with a clear message when None.

_TOKEN_EXTRACTOR_REGISTRY: dict[str, collections.abc.Callable[[nat.data_models.authentication.AuthResult], str | None]]#
register_token_extractor(
name: str,
extractor: collections.abc.Callable[[nat.data_models.authentication.AuthResult], str | None],
) None#

Register a custom token extractor for A365 telemetry.

Use when your auth provider returns credentials in a shape the default extractor does not understand (e.g. a new NAT credential type). Then set token_extractor=”name” in your a365 telemetry exporter config.

Args:

name: Name to use in config (e.g. “my_provider”). extractor: Callable (AuthResult) -> str | None. Return the bearer token or None.

_get_token_extractor(
name: str | None,
) collections.abc.Callable[[nat.data_models.authentication.AuthResult], str | None]#
_raise_no_bearer_token(
auth_result: nat.data_models.authentication.AuthResult,
) NoReturn#

Raise A365AuthenticationError with a clear message when no token could be extracted.

class _AgentTokenCache#

Thread-safe token cache keyed by (agent_id, tenant_id).

A process can host multiple A365 agents in the same workflow; each agent’s bearer token must be tracked separately because the A365 backend partitions telemetry by gen_ai.agent.id and validates the token’s appid claim against that route segment.

_lock#
_entries: dict[tuple[str | None, str | None], tuple[str, datetime.datetime | None]]#
static _expires_at_utc(
expires_at: datetime.datetime | None,
) datetime.datetime | None#
get_token(agent_id: str | None, tenant_id: str | None) str | None#

Return cached token for the key if still valid (5 min buffer), else None.

update_token(
agent_id: str | None,
tenant_id: str | None,
*,
token: str,
expires_at: datetime.datetime | None,
) None#
is_expiring_soon(
agent_id: str | None,
tenant_id: str | None,
buffer_minutes: int = 5,
) bool#

True if the cached token is unset, expired, or expiring within buffer_minutes.

class A365TelemetryExporter(/, **data: Any)#

Bases: nat.observability.mixin.batch_config_mixin.BatchConfigMixin, nat.data_models.telemetry_exporter.TelemetryExporterBaseConfig

A telemetry exporter to transmit traces to Microsoft Agent 365 backend.

Auth: the referenced auth provider should return a bearer token via BearerTokenCred or HeaderCred(Authorization). For other credential shapes, register a custom token extractor with register_token_extractor(name, callable) and set token_extractor=name.

Identity: when wired through the A365 front-end, each turn’s agent and tenant ids are sourced from context.activity.get_agentic_instance_id() and context.activity.get_agentic_tenant_id(). The agent_id and tenant_id fields below are fallbacks used only when no per-turn identity is available (e.g., CLI workflows or non-agentic activities).

Limitations to be aware of:

  • Single auth provider: token_resolver is a single AuthenticationRef whose MSAL client_id becomes the appid claim of every emitted token. Hosting multiple A365 agents with distinct app registrations in the same process is not currently supported — only the agent matching the auth provider’s client_id will pass A365’s token-validation check.

  • Cross-turn batching: identity is read at export time from a contextvar. BatchingProcessor schedules a flush task on first enqueue; that task inherits a copy of the contextvar at creation time. Spans queued by later turns into the same batch are exported under the first turn’s identity. For single-bot deployments this is invisible; for multi-bot deployments it can intermittently misattribute telemetry.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

agent_id: str | None = None#
tenant_id: str | None = None#
token_resolver: nat.data_models.component_ref.AuthenticationRef = None#
token_extractor: str | None = None#
cluster_category: str = None#
use_s2s_endpoint: bool = None#
suppress_invoke_agent_input: bool = None#
async a365_telemetry_exporter(
config: A365TelemetryExporter,
builder: nat.builder.builder.Builder,
)#

Create an Agent 365 telemetry exporter.

Auth resolution is deferred to first use of a given (agent_id, tenant_id) pair: the synchronous SDK callback returns whatever is in the cache, and the async helper on A365OtelExporter populates / refreshes entries before each export. This supports multi-agent processes (one cache entry per serving agent) and keeps WorkflowBuilder.__aenter__ free of auth I/O.