nat.plugins.data_flywheel.observability.processor.trace_conversion#

Submodules#

Attributes#

Classes#

TraceAdapterRegistry

Registry for trace source to target type conversions.

Functions#

extract_timestamp(→ int)

Extract timestamp from a span.

extract_token_usage(...)

Extract token usage information from a span.

extract_usage_info(...)

Extract usage information from a span.

span_to_dfw_record(→ pydantic.BaseModel)

Convert a span to Data Flywheel record using registered trace adapters.

Package Contents#

extract_timestamp(span: nat.data_models.span.Span) int#

Extract timestamp from a span.

Args:

span (Span): The span to extract timestamp from

Returns:

int: The timestamp

extract_token_usage(
span: nat.data_models.span.Span,
) nat.data_models.intermediate_step.TokenUsageBaseModel#

Extract token usage information from a span.

Args:

span (Span): The span to extract token usage from

Returns:

TokenUsageBaseModel: The token usage information

extract_usage_info(
span: nat.data_models.span.Span,
) nat.data_models.intermediate_step.UsageInfo#

Extract usage information from a span.

Args:

span (Span): The span to extract usage information from

Returns:

UsageInfo: The usage information

span_to_dfw_record(
span: nat.data_models.span.Span,
to_type: type[pydantic.BaseModel],
client_id: str,
) pydantic.BaseModel#

Convert a span to Data Flywheel record using registered trace adapters.

Creates a TraceContainer from the span, automatically detects the trace source type via Pydantic schema matching, then uses the registered converter to transform it to the specified target type.

Args:

span (Span): The span containing trace data to convert. to_type (type[BaseModel]): Target Pydantic model type for the conversion. client_id (str): Client identifier to include in the trace data.

Returns:

BaseModel: Converted record of the specified type.

Raises:
ValueError: If no converter is registered for the detected source type -> target type,

or if the conversion fails.

class TraceAdapterRegistry#

Registry for trace source to target type conversions.

Maintains schema detection through Pydantic unions while enabling dynamic registration of converter functions for different trace source types.

_registered_types: dict[type, dict[type, collections.abc.Callable]]#
_union_cache: Any = None#
classmethod register_adapter(
trace_source_model: type,
) collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]#

Register adapter with a trace source Pydantic model.

The model defines the schema for union-based detection, allowing automatic schema matching without explicit framework/provider specification.

Args:
trace_source_model (type): Pydantic model class that defines the trace source schema

(e.g., OpenAITraceSource, NIMTraceSource, CustomTraceSource)

Returns:

Callable: Decorator function that registers the converter

classmethod convert(
trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
to_type: type,
) Any#

Convert trace to target type using registered converter function.

Args:

trace_container (TraceContainer): TraceContainer with source data to convert to_type (type): Target type to convert to

Returns:

Converted object of to_type

Raises:

ValueError: If no converter is registered for source->target combination

classmethod get_adapter(
trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
to_type: type,
) collections.abc.Callable | None#

Get the converter function for a given trace source and target type.

Args:

trace_container (TraceContainer): TraceContainer with source data to_type (type): Target type to convert to

Returns:

Converter function if registered, None if not found

classmethod get_current_union() type#

Get the current source union with all registered source types.

Returns:

type: Union type containing all registered trace source types

classmethod _rebuild_union()#

Rebuild the union with all registered trace source types.

classmethod _update_trace_source_model()#

Update the TraceContainer model to use the current dynamic union.

classmethod unregister_adapter(source_type: type, target_type: type) bool#

Unregister a specific adapter.

Args:

source_type (type): The trace source type target_type (type): The target conversion type

Returns:

bool: True if adapter was found and removed, False if not found

classmethod unregister_all_adapters(source_type: type) int#

Unregister all adapters for a given source type.

Args:

source_type (type): The trace source type to remove all converters for

Returns:

int: Number of converters removed

classmethod clear_registry() int#

Clear all registered adapters. Useful for testing cleanup.

Returns:

int: Total number of converters removed

classmethod list_registered_types() dict[type, dict[type, collections.abc.Callable]]#

List all registered conversions: source_type -> {target_type -> converter}.

Returns:

dict[type, dict[type, Callable]]: Nested dict mapping source types to their available target conversions

register_adapter#