nat.atof#

Pydantic models for the Agentic Trajectory Observability Format (ATOF).

ATOF is a JSON-Lines wire format for agent runtime event streams. These models define the two event kinds (ScopeEvent, MarkEvent), the behavioral flag enum (Flags), and the canonical category vocabulary (Category).

See atof-event-format.md for the core wire format. For payload extraction, see nat.atof.extractors (schema-map-driven LLM extractors for OpenAI, Anthropic, and Gemini). For the open question of how producers should declare their schemas to consumers (a future spec revision), see the DESIGN NOTE block at the top of nat.atof.schemas.

Submodules#

Attributes#

Classes#

MarkEvent

Point-in-time checkpoint (spec §3.2).

ScopeEvent

Scope lifecycle event (spec §3.1).

LlmPayloadExtractor

Extracts ATIF-relevant fields from an llm scope event's data.

MarkPayloadExtractor

Classifies a mark event payload as either a role-lifted step

SchemaMap

Declarative description of where ATIF-relevant fields live within a

SchemaMapLlmExtractor

Generic LLM payload extractor driven by a SchemaMap.

ToolPayloadExtractor

Extracts a serialized result string from a tool scope-end payload.

Flags

Canonical behavioral flags for scope events (spec §2.1).

Functions#

register_anthropic_messages_v1(→ None)

Install the Anthropic Messages JSON Schema and LLM extractor.

register_gemini_generate_content_v1(→ None)

Install the Gemini generateContent JSON Schema and LLM extractor.

register_llm_extractor(→ None)

Register an LLM payload extractor for (name, version).

register_mark_extractor(→ None)

Register a mark payload extractor for (name, version).

register_tool_extractor(→ None)

Register a tool payload extractor for (name, version).

read_jsonl(→ list[nat.atof.events.Event])

Read an ATOF JSON-Lines file and return a list of typed Event objects.

write_jsonl(→ None)

Write a list of Event objects to a JSON-Lines file.

lookup_schema(→ dict[str, Any] | None)

Return the registered schema for (name, version) or None.

register_schema(→ None)

Register a JSON Schema for ATOF events whose data_schema matches

Package Contents#

Category#
Event#

Discriminated union of the 2 ATOF event kinds, keyed on kind (spec §3).

class MarkEvent(/, **data: Any)#

Bases: _EventBase

Point-in-time checkpoint (spec §3.2).

Unpaired (no start/end semantics). MAY carry category + category_profile to indicate the kind of work the checkpoint relates to; when both are absent, the mark is a generic named timestamp. Does NOT carry scope_category or attributes.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

kind: Literal['mark'] = 'mark'#
category: str | None = None#
category_profile: dict[str, Any] | None = None#
_validate_category_subtype_coherence() Self#
_reject_scope_only_fields() Self#
class ScopeEvent(/, **data: Any)#

Bases: _EventBase

Scope lifecycle event (spec §3.1).

A single scope span produces two ScopeEvent instances sharing the same uuid: one with scope_category: "start" when the scope is pushed onto the active scope stack, and one with scope_category: "end" when the scope is popped.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

kind: Literal['scope'] = 'scope'#
scope_category: Literal['start', 'end'] = None#
attributes: list[str] = None#
category: str = None#
category_profile: dict[str, Any] | None = None#
classmethod _canonicalize_attributes_field(v: Any) list[str]#
classmethod _validate_category(v: str) str#
_validate_category_subtype_coherence() Self#
ANTHROPIC_MESSAGES_V1_MAP#
GEMINI_GENERATE_CONTENT_V1_MAP#
LLM_EXTRACTOR_REGISTRY: dict[tuple[str, str], LlmPayloadExtractor]#
MARK_EXTRACTOR_REGISTRY: dict[tuple[str, str], MarkPayloadExtractor]#
OPENAI_CHAT_COMPLETIONS_V1_MAP#
TOOL_EXTRACTOR_REGISTRY: dict[tuple[str, str], ToolPayloadExtractor]#
class LlmPayloadExtractor#

Bases: Protocol

Extracts ATIF-relevant fields from an llm scope event’s data.

Implementations MUST be pure functions over data — no side effects, no network, no filesystem access. Return empty collections or strings when a field is not present; the converter distinguishes “legitimately empty” from “shape mismatch” at the dispatch layer.

extract_input_messages(data: Any) list[dict[str, Any]]#

Return the chat history messages from an LLM scope-start payload.

Each message SHOULD carry role and content keys; content MAY be a string or a multimodal part list (ATIF v1.6+).

extract_output_text(data: Any) str#

Return the assistant text from an LLM scope-end payload.

Returns "" when the response carries only tool_calls or has no text content.

extract_tool_calls(data: Any) list[dict[str, Any]]#

Return the tool_calls issued by the assistant in this turn.

Each dict MUST carry tool_call_id, function_name, and arguments (dict). Returns [] when no tool was called.

class MarkPayloadExtractor#

Bases: Protocol

Classifies a mark event payload as either a role-lifted step (user/system/agent) or an opaque system step.

extract_role_and_content(data: Any) tuple[str, Any] | None#

If the mark should lift to an ATIF step with a specific source, return (source, content). Otherwise return None to fall through to the opaque-system-step path.

source MUST be one of "user", "system", "agent". content is passed through as-is (string or part list).

class SchemaMap#

Declarative description of where ATIF-relevant fields live within a provider’s LLM payload, plus optional hooks for irreducible transforms.

A SchemaMap captures three things:

  1. Field paths — dotted paths (with numeric list indices) telling the engine where to find input messages, output text, and output tool calls. Each field accepts a tuple of candidate paths; the engine tries them in order and uses the first hit.

  2. Per-tool-call sub-paths — for providers whose tool-call shape fits the OpenAI flat-or-nested convention. Each tool call is a dict; these paths name where ID/name/arguments live within that dict.

  3. Optional hooks — escape hatches for the three transforms that can’t be expressed declaratively:

    • normalize_input_messages: input data → ATIF-shaped message list. Use when content is polymorphic (Anthropic string-or-blocks, Gemini parts) and a single field-path can’t flatten it.

    • normalize_output_message: output data(text, tool_calls) pair. Use when output text and tool calls coexist in the same polymorphic structure (Anthropic content blocks).

    • transform_tool_call: per-call dict adapter. Use when tool calls don’t carry an ID (Gemini synthesizes from name+index) or use non-OpenAI nesting.

Hooks always win over paths. If normalize_output_message is set, the engine ignores output_text_paths and output_tool_calls_paths.

Pure-paths providers (OpenAI) leave the hooks at None. Mixed providers (Anthropic, Gemini) use one or two hooks.

Parameters:
  • name – Schema name (e.g. "openai/chat-completions").

  • version – Schema version string.

  • input_messages_paths – Candidate paths to the input messages array.

  • output_text_paths – Candidate paths to the output assistant text.

  • output_tool_calls_paths – Candidate paths to the output tool-calls array.

  • tool_call_id_paths – Candidate sub-paths for tool-call ID.

  • tool_call_name_paths – Candidate sub-paths for tool-call function name.

  • tool_call_args_paths – Candidate sub-paths for tool-call arguments.

  • tool_call_args_parse_json – When True, parse string arguments as JSON.

  • role_aliases – Map of provider role values to canonical role values (e.g., {"model": "assistant"} for Gemini). Applied to messages extracted via field paths; hooks bypass this.

  • normalize_input_messages – Optional hook overriding path-based input extraction. Signature: (data) -> list[{"role", "content", ...}].

  • normalize_output_message – Optional hook overriding path-based output extraction. Signature: (data) -> (text, tool_calls).

  • transform_tool_call – Optional per-call adapter. Signature: (raw_call_dict, index) -> ATIF-shaped {"tool_call_id", "function_name", "arguments"}. When set, replaces the per-tool-call path resolution entirely.

name: str#
version: str#
input_messages_paths: tuple[str, Ellipsis] = ()#
output_text_paths: tuple[str, Ellipsis] = ()#
output_tool_calls_paths: tuple[str, Ellipsis] = ()#
tool_call_id_paths: tuple[str, Ellipsis] = ('id',)#
tool_call_name_paths: tuple[str, Ellipsis] = ('name', 'function.name')#
tool_call_args_paths: tuple[str, Ellipsis] = ('arguments', 'function.arguments')#
tool_call_args_parse_json: bool = True#
role_aliases: collections.abc.Mapping[str, str]#
normalize_input_messages: collections.abc.Callable[[Any], list[dict[str, Any]]] | None = None#
normalize_output_message: collections.abc.Callable[[Any], tuple[str, list[dict[str, Any]]]] | None = None#
transform_tool_call: collections.abc.Callable[[dict[str, Any], int], dict[str, Any]] | None = None#
class SchemaMapLlmExtractor(schema_map: SchemaMap)#

Generic LLM payload extractor driven by a SchemaMap.

Implements LlmPayloadExtractor by routing extraction through the map’s hooks (when set) or its declarative field paths (otherwise). A single instance per (name, version) is the intended pattern; register it with register_llm_extractor().

schema_map#
extract_input_messages(data: Any) list[dict[str, Any]]#
extract_output_text(data: Any) str#
extract_tool_calls(data: Any) list[dict[str, Any]]#
_apply_role_aliases(messages: list[Any]) list[dict[str, Any]]#
_extract_tool_call_fields(raw: dict[str, Any]) dict[str, Any]#
class ToolPayloadExtractor#

Bases: Protocol

Extracts a serialized result string from a tool scope-end payload.

extract_tool_result(data: Any) str | None#

Return the tool result as a string, or None when data is None.

register_anthropic_messages_v1() None#

Install the Anthropic Messages JSON Schema and LLM extractor.

Idempotent — safe to call multiple times. Registers anthropic/messages@1 in both SCHEMA_REGISTRY (validation) and LLM_EXTRACTOR_REGISTRY (extraction). Call this once at process startup before invoking the converter on Anthropic-shaped payloads.

register_gemini_generate_content_v1() None#

Install the Gemini generateContent JSON Schema and LLM extractor.

Idempotent — safe to call multiple times. Registers gemini/generate-content@1 in both SCHEMA_REGISTRY and LLM_EXTRACTOR_REGISTRY. Call this once at process startup before invoking the converter on Gemini-shaped payloads.

register_llm_extractor(
name: str,
version: str,
extractor: LlmPayloadExtractor,
) None#

Register an LLM payload extractor for (name, version).

register_mark_extractor(
name: str,
version: str,
extractor: MarkPayloadExtractor,
) None#

Register a mark payload extractor for (name, version).

register_tool_extractor(
name: str,
version: str,
extractor: ToolPayloadExtractor,
) None#

Register a tool payload extractor for (name, version).

class Flags#

Bases: enum.StrEnum

Canonical behavioral flags for scope events (spec §2.1).

Each flag describes the exceptional runtime property of a scope; absence means the documented default applies.

Initialize self. See help(type(self)) for accurate signature.

PARALLEL = 'parallel'#
RELOCATABLE = 'relocatable'#
STATEFUL = 'stateful'#
STREAMING = 'streaming'#
REMOTE = 'remote'#
read_jsonl(path: str | pathlib.Path) list[nat.atof.events.Event]#

Read an ATOF JSON-Lines file and return a list of typed Event objects.

Each line is parsed as a JSON object and validated against the Event discriminated union. Blank lines are skipped. Events are returned sorted by .ts_micros (the normalized int-microsecond timestamp, spec §5.1) so downstream consumers get a stable ordering across mixed str/int timestamp streams.

write_jsonl(
events: list[nat.atof.events.Event],
path: str | pathlib.Path,
) None#

Write a list of Event objects to a JSON-Lines file.

Each event is serialized as a single JSON line. The file ends with a trailing newline. Optional fields with None values are emitted as explicit null on the wire (matching the spec wire envelope example in atof-event-format.md §1).

ANTHROPIC_MESSAGES_V1: dict[str, Any]#
GEMINI_GENERATE_CONTENT_V1: dict[str, Any]#
SCHEMA_REGISTRY: dict[tuple[str, str], dict[str, Any]]#
lookup_schema(name: str, version: str) dict[str, Any] | None#

Return the registered schema for (name, version) or None.

register_schema(
name: str,
version: str,
schema: dict[str, Any],
) None#

Register a JSON Schema for ATOF events whose data_schema matches {name, version}.

Overwrites any existing entry with the same key.