observability.processor.trace_conversion.trace_adapter_registry#
Attributes#
Classes#
Registry for trace source to target type conversions. |
Module Contents#
- logger#
- class TraceAdapterRegistry#
Registry for trace source to target type conversions.
Maintains schema detection through Pydantic unions while enabling dynamic registration of converter functions for different trace source types.
- _union_cache: Any = None#
- classmethod register_adapter(
- trace_source_model: type,
Register adapter with a trace source Pydantic model.
The model defines the schema for union-based detection, allowing automatic schema matching without explicit framework/provider specification.
- Args:
- trace_source_model (type): Pydantic model class that defines the trace source schema
(e.g., OpenAITraceSource, NIMTraceSource, CustomTraceSource)
- Returns:
Callable: Decorator function that registers the converter
- classmethod convert(
- trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
- to_type: type,
Convert trace to target type using registered converter function.
- Args:
trace_container (TraceContainer): TraceContainer with source data to convert to_type (type): Target type to convert to
- Returns:
Converted object of to_type
- Raises:
ValueError: If no converter is registered for source->target combination
- classmethod get_adapter(
- trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
- to_type: type,
Get the converter function for a given trace source and target type.
- Args:
trace_container (TraceContainer): TraceContainer with source data to_type (type): Target type to convert to
- Returns:
Converter function if registered, None if not found
- classmethod get_current_union() type#
Get the current source union with all registered source types.
- Returns:
type: Union type containing all registered trace source types
- classmethod _rebuild_union()#
Rebuild the union with all registered trace source types.
- classmethod _update_trace_source_model()#
Update the TraceContainer model to use the current dynamic union.
- classmethod unregister_adapter(source_type: type, target_type: type) bool#
Unregister a specific adapter.
- Args:
source_type (type): The trace source type target_type (type): The target conversion type
- Returns:
bool: True if adapter was found and removed, False if not found
- classmethod unregister_all_adapters(source_type: type) int#
Unregister all adapters for a given source type.
- Args:
source_type (type): The trace source type to remove all converters for
- Returns:
int: Number of converters removed
- register_adapter#
- unregister_adapter#
- unregister_all_adapters#
- clear_registry#