nat.plugins.data_flywheel.observability.processor#

Submodules#

Classes#

DFWToDictProcessor

Processor that converts a Data Flywheel record to a dictionary.

SpanToDFWRecordProcessor

Processor that converts a Span to a Data Flywheel record.

TraceAdapterRegistry

Registry for trace source to target type conversions.

Package Contents#

class DFWToDictProcessor#

Bases: nat.observability.processor.processor.Processor[DFWRecordT, dict]

Processor that converts a Data Flywheel record to a dictionary.

Serializes Pydantic DFW record models to dictionaries using model_dump_json() for consistent field aliasing and proper JSON serialization.

async process(item: DFWRecordT | None) dict#

Convert a DFW record to a dictionary.

Args:

item (DFWRecordT | None): The DFW record to convert.

Returns:

dict: The converted dictionary.

class SpanToDFWRecordProcessor(client_id: str)#

Bases: nat.observability.processor.processor.Processor[nat.data_models.span.Span, DFWRecordT | None], nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixin

Processor that converts a Span to a Data Flywheel record.

Extracts trace data from spans and uses the trace adapter registry to convert it to the target DFW record format.

_client_id#
async process(item: nat.data_models.span.Span) DFWRecordT | None#

Convert a Span to a DFW record.

Args:

item (Span): The Span to convert.

Returns:

DFWRecordT | None: The converted DFW record.

class TraceAdapterRegistry#

Registry for trace source to target type conversions.

Maintains schema detection through Pydantic unions while enabling dynamic registration of converter functions for different trace source types.

_registered_types: dict[type, dict[type, collections.abc.Callable]]#
_union_cache: Any = None#
classmethod register_adapter(
trace_source_model: type,
) collections.abc.Callable[[collections.abc.Callable], collections.abc.Callable]#

Register adapter with a trace source Pydantic model.

The model defines the schema for union-based detection, allowing automatic schema matching without explicit framework/provider specification.

Args:
trace_source_model (type): Pydantic model class that defines the trace source schema

(e.g., OpenAITraceSource, NIMTraceSource, CustomTraceSource)

Returns:

Callable: Decorator function that registers the converter

classmethod convert(
trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
to_type: type,
) Any#

Convert trace to target type using registered converter function.

Args:

trace_container (TraceContainer): TraceContainer with source data to convert to_type (type): Target type to convert to

Returns:

Converted object of to_type

Raises:

ValueError: If no converter is registered for source->target combination

classmethod get_adapter(
trace_container: nat.plugins.data_flywheel.observability.schema.trace_container.TraceContainer,
to_type: type,
) collections.abc.Callable | None#

Get the converter function for a given trace source and target type.

Args:

trace_container (TraceContainer): TraceContainer with source data to_type (type): Target type to convert to

Returns:

Converter function if registered, None if not found

classmethod get_current_union() type#

Get the current source union with all registered source types.

Returns:

type: Union type containing all registered trace source types

classmethod _rebuild_union()#

Rebuild the union with all registered trace source types.

classmethod _update_trace_source_model()#

Update the TraceContainer model to use the current dynamic union.

classmethod unregister_adapter(source_type: type, target_type: type) bool#

Unregister a specific adapter.

Args:

source_type (type): The trace source type target_type (type): The target conversion type

Returns:

bool: True if adapter was found and removed, False if not found

classmethod unregister_all_adapters(source_type: type) int#

Unregister all adapters for a given source type.

Args:

source_type (type): The trace source type to remove all converters for

Returns:

int: Number of converters removed

classmethod clear_registry() int#

Clear all registered adapters. Useful for testing cleanup.

Returns:

int: Total number of converters removed

classmethod list_registered_types() dict[type, dict[type, collections.abc.Callable]]#

List all registered conversions: source_type -> {target_type -> converter}.

Returns:

dict[type, dict[type, Callable]]: Nested dict mapping source types to their available target conversions