nat.atof.extractors#

Pluggable payload extractors for the ATOF→ATIF converter.

The ATOF wire envelope is producer-agnostic, but the contents of event.data are producer-defined. The converter must translate those contents into ATIF step fields (messages, tool calls, tool results, mark-lifted sources). This module defines three Protocol interfaces and three registries that let producers plug in their own extractors, keyed on the producer-declared data_schema = {name, version}:

  • LlmPayloadExtractor — for category == "llm" scope events: parses input messages, output text, and assistant tool_calls.

  • ToolPayloadExtractor — for category == "tool" scope-end events: serializes the tool result to a string.

  • MarkPayloadExtractor — for mark events whose payload carries a role hint that should lift to an ATIF step source.

LLM extractors are produced by combining a declarative SchemaMap with the generic SchemaMapLlmExtractor engine. A SchemaMap captures the per-provider field paths (where input messages live, where output text lives, where tool calls live) plus three optional hooks for the irreducible per-provider transforms: polymorphic content unpacking, output-message decomposition, and tool-call shape adaptation. Most providers are expressible as pure paths; richer providers (Anthropic content blocks, Gemini parts) use the hooks.

Ships one built-in extractor per protocol:

Register new extractors before calling the converter. For an OpenAI-shaped provider, define a SchemaMap and register it:

from nat.atof.extractors import (
    SchemaMap, SchemaMapLlmExtractor, register_llm_extractor,
)

MYCO_MAP = SchemaMap(
    name="myco/chat", version="1",
    input_messages_paths=("messages",),
    output_text_paths=("response",),
    output_tool_calls_paths=("tool_calls",),
)
register_llm_extractor("myco/chat", "1", SchemaMapLlmExtractor(MYCO_MAP))

For richer shapes (Anthropic content blocks, Gemini parts), use the hook fields on SchemaMap to handle the irreducible transforms.

Attributes#

Classes#

LlmPayloadExtractor

Extracts ATIF-relevant fields from an llm scope event's data.

ToolPayloadExtractor

Extracts a serialized result string from a tool scope-end payload.

MarkPayloadExtractor

Classifies a mark event payload as either a role-lifted step

SchemaMap

Declarative description of where ATIF-relevant fields live within a

SchemaMapLlmExtractor

Generic LLM payload extractor driven by a SchemaMap.

OpenAiChatCompletionsLlmExtractor

Reference LLM extractor accepting both direct and nested OpenAI shapes.

GenericToolResultExtractor

Unwraps {result: X} or {output: X} single-key wrappers into

NatRoleMarkExtractor

Lifts a mark event to a sourced ATIF step when its payload carries

Functions#

register_anthropic_messages_v1(→ None)

Install the Anthropic Messages JSON Schema and LLM extractor.

register_gemini_generate_content_v1(→ None)

Install the Gemini generateContent JSON Schema and LLM extractor.

register_llm_extractor(→ None)

Register an LLM payload extractor for (name, version).

register_tool_extractor(→ None)

Register a tool payload extractor for (name, version).

register_mark_extractor(→ None)

Register a mark payload extractor for (name, version).

resolve_llm_extractor(→ LlmPayloadExtractor)

Return the LLM extractor registered for data_schema, or the

resolve_tool_extractor(→ ToolPayloadExtractor)

Return the tool extractor registered for data_schema, or the

resolve_mark_extractor(→ MarkPayloadExtractor)

Return the mark extractor registered for data_schema, or the

Module Contents#

class LlmPayloadExtractor#

Bases: Protocol

Extracts ATIF-relevant fields from an llm scope event’s data.

Implementations MUST be pure functions over data — no side effects, no network, no filesystem access. Return empty collections or strings when a field is not present; the converter distinguishes “legitimately empty” from “shape mismatch” at the dispatch layer.

extract_input_messages(data: Any) list[dict[str, Any]]#

Return the chat history messages from an LLM scope-start payload.

Each message SHOULD carry role and content keys; content MAY be a string or a multimodal part list (ATIF v1.6+).

extract_output_text(data: Any) str#

Return the assistant text from an LLM scope-end payload.

Returns "" when the response carries only tool_calls or has no text content.

extract_tool_calls(data: Any) list[dict[str, Any]]#

Return the tool_calls issued by the assistant in this turn.

Each dict MUST carry tool_call_id, function_name, and arguments (dict). Returns [] when no tool was called.

class ToolPayloadExtractor#

Bases: Protocol

Extracts a serialized result string from a tool scope-end payload.

extract_tool_result(data: Any) str | None#

Return the tool result as a string, or None when data is None.

class MarkPayloadExtractor#

Bases: Protocol

Classifies a mark event payload as either a role-lifted step (user/system/agent) or an opaque system step.

extract_role_and_content(data: Any) tuple[str, Any] | None#

If the mark should lift to an ATIF step with a specific source, return (source, content). Otherwise return None to fall through to the opaque-system-step path.

source MUST be one of "user", "system", "agent". content is passed through as-is (string or part list).

class SchemaMap#

Declarative description of where ATIF-relevant fields live within a provider’s LLM payload, plus optional hooks for irreducible transforms.

A SchemaMap captures three things:

  1. Field paths — dotted paths (with numeric list indices) telling the engine where to find input messages, output text, and output tool calls. Each field accepts a tuple of candidate paths; the engine tries them in order and uses the first hit.

  2. Per-tool-call sub-paths — for providers whose tool-call shape fits the OpenAI flat-or-nested convention. Each tool call is a dict; these paths name where ID/name/arguments live within that dict.

  3. Optional hooks — escape hatches for the three transforms that can’t be expressed declaratively:

    • normalize_input_messages: input data → ATIF-shaped message list. Use when content is polymorphic (Anthropic string-or-blocks, Gemini parts) and a single field-path can’t flatten it.

    • normalize_output_message: output data(text, tool_calls) pair. Use when output text and tool calls coexist in the same polymorphic structure (Anthropic content blocks).

    • transform_tool_call: per-call dict adapter. Use when tool calls don’t carry an ID (Gemini synthesizes from name+index) or use non-OpenAI nesting.

Hooks always win over paths. If normalize_output_message is set, the engine ignores output_text_paths and output_tool_calls_paths.

Pure-paths providers (OpenAI) leave the hooks at None. Mixed providers (Anthropic, Gemini) use one or two hooks.

Parameters:
  • name – Schema name (e.g. "openai/chat-completions").

  • version – Schema version string.

  • input_messages_paths – Candidate paths to the input messages array.

  • output_text_paths – Candidate paths to the output assistant text.

  • output_tool_calls_paths – Candidate paths to the output tool-calls array.

  • tool_call_id_paths – Candidate sub-paths for tool-call ID.

  • tool_call_name_paths – Candidate sub-paths for tool-call function name.

  • tool_call_args_paths – Candidate sub-paths for tool-call arguments.

  • tool_call_args_parse_json – When True, parse string arguments as JSON.

  • role_aliases – Map of provider role values to canonical role values (e.g., {"model": "assistant"} for Gemini). Applied to messages extracted via field paths; hooks bypass this.

  • normalize_input_messages – Optional hook overriding path-based input extraction. Signature: (data) -> list[{"role", "content", ...}].

  • normalize_output_message – Optional hook overriding path-based output extraction. Signature: (data) -> (text, tool_calls).

  • transform_tool_call – Optional per-call adapter. Signature: (raw_call_dict, index) -> ATIF-shaped {"tool_call_id", "function_name", "arguments"}. When set, replaces the per-tool-call path resolution entirely.

name: str#
version: str#
input_messages_paths: tuple[str, Ellipsis] = ()#
output_text_paths: tuple[str, Ellipsis] = ()#
output_tool_calls_paths: tuple[str, Ellipsis] = ()#
tool_call_id_paths: tuple[str, Ellipsis] = ('id',)#
tool_call_name_paths: tuple[str, Ellipsis] = ('name', 'function.name')#
tool_call_args_paths: tuple[str, Ellipsis] = ('arguments', 'function.arguments')#
tool_call_args_parse_json: bool = True#
role_aliases: collections.abc.Mapping[str, str]#
normalize_input_messages: collections.abc.Callable[[Any], list[dict[str, Any]]] | None = None#
normalize_output_message: collections.abc.Callable[[Any], tuple[str, list[dict[str, Any]]]] | None = None#
transform_tool_call: collections.abc.Callable[[dict[str, Any], int], dict[str, Any]] | None = None#
class SchemaMapLlmExtractor(schema_map: SchemaMap)#

Generic LLM payload extractor driven by a SchemaMap.

Implements LlmPayloadExtractor by routing extraction through the map’s hooks (when set) or its declarative field paths (otherwise). A single instance per (name, version) is the intended pattern; register it with register_llm_extractor().

schema_map#
extract_input_messages(data: Any) list[dict[str, Any]]#
extract_output_text(data: Any) str#
extract_tool_calls(data: Any) list[dict[str, Any]]#
_apply_role_aliases(messages: list[Any]) list[dict[str, Any]]#
_extract_tool_call_fields(raw: dict[str, Any]) dict[str, Any]#
OPENAI_CHAT_COMPLETIONS_V1_MAP#
class OpenAiChatCompletionsLlmExtractor#

Bases: SchemaMapLlmExtractor

Reference LLM extractor accepting both direct and nested OpenAI shapes.

Thin convenience wrapper around OPENAI_CHAT_COMPLETIONS_V1_MAP. Behavior is identical to instantiating SchemaMapLlmExtractor(OPENAI_CHAT_COMPLETIONS_V1_MAP).

Input shapes (extract_input_messages):

  • {"messages": [...]}

  • {"content": {"messages": [...]}}

Output shapes (extract_output_text):

  • {"content": "..."}

  • {"choices": [{"message": {"content": "..."}}]}

Tool-call shapes (extract_tool_calls):

  • Flat: {"tool_calls": [{"id", "name", "arguments"}]}

  • Nested: {"choices": [{"message": {"tool_calls": [...]}}]}

  • Per-call: either flat {id, name, arguments} or the OpenAI {id, function: {name, arguments}} form.

ANTHROPIC_MESSAGES_V1_MAP#
register_anthropic_messages_v1() None#

Install the Anthropic Messages JSON Schema and LLM extractor.

Idempotent — safe to call multiple times. Registers anthropic/messages@1 in both SCHEMA_REGISTRY (validation) and LLM_EXTRACTOR_REGISTRY (extraction). Call this once at process startup before invoking the converter on Anthropic-shaped payloads.

GEMINI_GENERATE_CONTENT_V1_MAP#
register_gemini_generate_content_v1() None#

Install the Gemini generateContent JSON Schema and LLM extractor.

Idempotent — safe to call multiple times. Registers gemini/generate-content@1 in both SCHEMA_REGISTRY and LLM_EXTRACTOR_REGISTRY. Call this once at process startup before invoking the converter on Gemini-shaped payloads.

class GenericToolResultExtractor#

Unwraps {result: X} or {output: X} single-key wrappers into a primitive or JSON-serialized string; otherwise serializes the whole payload as compact JSON.

extract_tool_result(data: Any) str | None#
class NatRoleMarkExtractor#

Lifts a mark event to a sourced ATIF step when its payload carries data.role {"user", "system", "agent"}. Content is taken from data.content then data.message (string fallback "").

_VALID_ROLES#
extract_role_and_content(data: Any) tuple[str, Any] | None#
DEFAULT_LLM_EXTRACTOR: LlmPayloadExtractor#
DEFAULT_TOOL_EXTRACTOR: ToolPayloadExtractor#
DEFAULT_MARK_EXTRACTOR: MarkPayloadExtractor#
LLM_EXTRACTOR_REGISTRY: dict[tuple[str, str], LlmPayloadExtractor]#
TOOL_EXTRACTOR_REGISTRY: dict[tuple[str, str], ToolPayloadExtractor]#
MARK_EXTRACTOR_REGISTRY: dict[tuple[str, str], MarkPayloadExtractor]#
register_llm_extractor(
name: str,
version: str,
extractor: LlmPayloadExtractor,
) None#

Register an LLM payload extractor for (name, version).

register_tool_extractor(
name: str,
version: str,
extractor: ToolPayloadExtractor,
) None#

Register a tool payload extractor for (name, version).

register_mark_extractor(
name: str,
version: str,
extractor: MarkPayloadExtractor,
) None#

Register a mark payload extractor for (name, version).

resolve_llm_extractor(
data_schema: dict[str, Any] | None,
) LlmPayloadExtractor#

Return the LLM extractor registered for data_schema, or the built-in OpenAI chat-completions extractor if unregistered/absent.

resolve_tool_extractor(
data_schema: dict[str, Any] | None,
) ToolPayloadExtractor#

Return the tool extractor registered for data_schema, or the generic result-unwrap extractor if unregistered/absent.

resolve_mark_extractor(
data_schema: dict[str, Any] | None,
) MarkPayloadExtractor#

Return the mark extractor registered for data_schema, or the built-in role-lifting extractor if unregistered/absent.