nat.atof.schemas#

Registered JSON Schemas for validating ATOF event.data payloads.

The ATOF envelope carries an optional data_schema = {name, version} identifier declaring the shape of event.data. Spec §2 leaves schema validation to the consumer.

This module maintains a process-wide registry keyed on (name, version) -> JSON Schema dict and ships one built-in schema:

  • openai/chat-completions@1 — permissive shape check for LLM scope-start and scope-end payloads; accepts any object carrying at least one of the extractable top-level keys: messages, content, tool_calls, choices.

External producers register their own schemas via register_schema():

from nat.atof.schemas import register_schema

register_schema("myco/my-payload", "1", {
    "type": "object",
    "required": ["myco_field"],
})

Consumers validate an event by looking up the schema and calling jsonschema.validate(). The ATOF→ATIF converter wires this into its pre-pass and raises DataSchemaViolationError on failure.

DESIGN NOTE: Producer-Declared Schema Discovery (Future)#

Today, registering a non-default schema/extractor is a consumer-side concern: the consumer calls register_schema() and nat.atof.extractors.register_llm_extractor() (or one of the register_*_v1() convenience helpers) before invoking the converter. The producer declares data_schema = {name, version} per event but offers no mechanism to deliver the schema or extractor logic along with the stream. This works fine when the consumer knows the producer in advance (the ATOF v0.1 expectation) but becomes friction once a single consumer wants to ingest streams from multiple producers without prior coordination — e.g. a forensics tool replaying old trajectories from a producer it has never seen.

Three design options are on the table for a future ATOF revision; none are implemented yet. Captured here so the next iteration doesn’t relitigate the trade-off space:

  1. Stream-level schema manifest — Reserve the first line of the JSONL stream for a non-event manifest:

    {"type": "atof_schema_manifest",
     "schemas": [{"name": ..., "version": ..., "json_schema": {...},
                  "extractor_plugin": "anthropic.messages.v1"}]}
    

    Consumers parse the manifest, register declared schemas + extractor plugins, then process events normally. Pros: backward-compat (consumers ignore unknown first line), explicit, easy to ship. Cons: requires a new wire-format reservation; extractor_plugin references opaque code (security and trust concerns).

  2. ATOF-native metadata on root scope-start — Embed the manifest in metadata._atof_schemas on the root agent ScopeStart event. Already-permitted by spec §2.1 (open metadata). Pros: no wire format change, zero-overhead for streams that don’t use it. Cons: late discovery (consumer can’t pre-register before seeing events), and requires every producer to remember this convention.

  3. Out-of-band manifest file — Ship a sidecar manifest alongside the JSONL (e.g. trajectory.jsonl + trajectory.manifest.json). Consumers load both. Pros: clean separation; schemas can be versioned and signed independently. Cons: two-file coupling is fragile; transport-level constraints (logs systems, kafka) often drop sidecars.

Recommendation when the work is taken up: prototype (A) first — it’s the least invasive and is self-documenting in the stream itself. Decline (C) unless storage transports demand it. (B) is a cheap fallback if (A) hits backward-compat blockers.

This block is the architectural commitment record. Update it when the decision is made; do not expand the registry/helpers in this module without a corresponding spec amendment.

Attributes#

Functions#

register_schema(→ None)

Register a JSON Schema for ATOF events whose data_schema matches

lookup_schema(→ dict[str, Any] | None)

Return the registered schema for (name, version) or None.

Module Contents#

SCHEMA_REGISTRY: dict[tuple[str, str], dict[str, Any]]#
register_schema(
name: str,
version: str,
schema: dict[str, Any],
) None#

Register a JSON Schema for ATOF events whose data_schema matches {name, version}.

Overwrites any existing entry with the same key.

lookup_schema(name: str, version: str) dict[str, Any] | None#

Return the registered schema for (name, version) or None.

OPENAI_CHAT_COMPLETIONS_V1: dict[str, Any]#
ANTHROPIC_MESSAGES_V1: dict[str, Any]#
GEMINI_GENERATE_CONTENT_V1: dict[str, Any]#