observability.processor.trace_conversion.trace_adapter_registry#

Attributes#

Classes#

TraceAdapterRegistry

Registry for trace source to target type conversions.

Module Contents#

logger#
class TraceAdapterRegistry#

Registry for trace source to target type conversions.

Maintains schema detection through Pydantic unions while enabling dynamic registration of converter functions for different trace source types.

_registered_types: dict[type, dict[type, collections.abc.Callable]]#
_union_cache: Any = None#
classmethod register_adapter(
trace_source_model: type,
) collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]#

Register adapter with a trace source Pydantic model.

The model defines the schema for union-based detection, allowing automatic schema matching without explicit framework/provider specification.

Args:
trace_source_model (type): Pydantic model class that defines the trace source schema

(e.g., OpenAITraceSource, NIMTraceSource, CustomTraceSource)

Returns:

Callable: Decorator function that registers the converter

classmethod convert(
trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
to_type: type,
) Any#

Convert trace to target type using registered converter function.

Args:

trace_container (TraceContainer): TraceContainer with source data to convert to_type (type): Target type to convert to

Returns:

Converted object of to_type

Raises:

ValueError: If no converter is registered for source->target combination

classmethod get_adapter(
trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
to_type: type,
) collections.abc.Callable | None#

Get the converter function for a given trace source and target type.

Args:

trace_container (TraceContainer): TraceContainer with source data to_type (type): Target type to convert to

Returns:

Converter function if registered, None if not found

classmethod get_current_union() type#

Get the current source union with all registered source types.

Returns:

type: Union type containing all registered trace source types

classmethod _rebuild_union()#

Rebuild the union with all registered trace source types.

classmethod _update_trace_source_model()#

Update the TraceContainer model to use the current dynamic union.

classmethod unregister_adapter(source_type: type, target_type: type) bool#

Unregister a specific adapter.

Args:

source_type (type): The trace source type target_type (type): The target conversion type

Returns:

bool: True if adapter was found and removed, False if not found

classmethod unregister_all_adapters(source_type: type) int#

Unregister all adapters for a given source type.

Args:

source_type (type): The trace source type to remove all converters for

Returns:

int: Number of converters removed

classmethod clear_registry() int#

Clear all registered adapters. Useful for testing cleanup.

Returns:

int: Total number of converters removed

classmethod list_registered_types() dict[type, dict[type, collections.abc.Callable]]#

List all registered conversions: source_type -> {target_type -> converter}.

Returns:

dict[type, dict[type, Callable]]: Nested dict mapping source types to their available target conversions

register_adapter#
unregister_adapter#
unregister_all_adapters#
clear_registry#