nat.observability.exporter.base_exporter#
Attributes#
Classes#
Descriptor for copy-on-write isolation. |
|
Abstract base class for event exporters with isolated copy support. |
Module Contents#
- logger#
- IsolatedAttributeT#
- class IsolatedAttribute(
- factory: collections.abc.Callable[[], IsolatedAttributeT],
Bases:
Generic
[IsolatedAttributeT
]Descriptor for copy-on-write isolation.
This descriptor uses Python’s descriptor protocol to automatically manage attribute isolation during object copying. It enables efficient concurrent execution by sharing expensive resources while isolating mutable state.
Performance Note: This pattern shares expensive resources (HTTP clients, auth headers) while isolating cheap mutable state (task sets, events). Tasks are tracked for monitoring but don’t block shutdown - they complete asynchronously in the event loop. Critical for high-throughput concurrent execution.
Implementation Note: Uses Python descriptor protocol (__get__, __set__, __set_name__) for automatic attribute isolation on object copying.
- Example:
- class MyExporter(BaseExporter):
# Expensive HTTP client shared across instances _client = expensive_http_client
# Cheap mutable state isolated per instance _tasks: IsolatedAttribute[set] = IsolatedAttribute(set)
exporter1 = MyExporter(endpoint=”https://api.service.com”) exporter2 = exporter1.create_isolated_instance(context) # exporter2 shares _client but has isolated _tasks tracking
- factory#
- reset_for_copy(obj)#
Reset the attribute for a copied object.
- class BaseExporter(
- context_state: nat.builder.context.ContextState | None = None,
Bases:
nat.observability.exporter.exporter.Exporter
Abstract base class for event exporters with isolated copy support.
This class provides the foundation for creating event exporters that can handle concurrent execution through copy-on-write isolation. It manages the lifecycle of event subscriptions and provides hooks for processing events.
The class supports isolation for concurrent execution by automatically resetting mutable state when creating isolated copies using descriptors.
- Performance Design:
Export tasks run asynchronously in the event loop background
stop() method does not wait for background tasks to complete
Tasks are tracked for monitoring but cleaned up automatically
This keeps observability “off the hot path” for optimal performance
- Args:
context_state (ContextState, optional): The context state to use for the exporter. Defaults to None.
Initialize the BaseExporter.
- _active_instances: set[weakref.ref]#
- _isolated_instances: set[weakref.ref]#
- _tasks: IsolatedAttribute[set[asyncio.Task]]#
- _ready_event: IsolatedAttribute[asyncio.Event]#
- _shutdown_event: IsolatedAttribute[asyncio.Event]#
- _context_state = None#
- _subscription = None#
- _running = False#
- _loop = None#
- _is_isolated_instance = False#
- classmethod _cleanup_instance_tracking(ref)#
Cleanup callback for weakref when instance is garbage collected.
- classmethod get_active_instance_count() int #
Get the number of active BaseExporter instances.
- Returns:
int: Number of active instances (cleaned up automatically via weakref)
- classmethod get_isolated_instance_count() int #
Get the number of active isolated BaseExporter instances.
- Returns:
int: Number of active isolated instances
- property is_isolated_instance: bool#
Check if this is an isolated instance.
- Returns:
bool: True if this is an isolated instance, False otherwise
- abstractmethod export( ) None #
This method is called on each event from the event stream to initiate the trace export.
This is the base implementation that can be overridden by subclasses. By default, it does nothing - subclasses should implement their specific export logic.
- Args:
event (IntermediateStep): The event to be exported.
- on_error(exc: Exception) None #
Handle an error in the event subscription.
- Args:
exc (Exception): The error to handle.
- on_complete() None #
Handle the completion of the event stream.
This method is called when the event stream is complete.
- _start() nat.utils.reactive.subject.Subject | None #
Start the exporter.
- Returns:
Subject | None: The subject to subscribe to.
- async _pre_start()#
Called before the exporter starts.
- async start() collections.abc.AsyncGenerator[None] #
Start the exporter and yield control to the caller.
- async _cleanup()#
Clean up any resources.
- async _cancel_tasks()#
Cancel all scheduled tasks.
Note: This method is NOT called during normal stop() operation for performance. It’s available for special cases where explicit task completion is needed.
- async _wait_for_tasks(timeout: float = 5.0)#
Wait for all tracked tasks to complete with a timeout.
Note: This method is NOT called during normal stop() operation for performance. It’s available for special cases where explicit task completion is needed.
- Args:
timeout (float, optional): The timeout in seconds. Defaults to 5.0.
- async stop()#
Stop the exporter immediately without waiting for background tasks.
This method performs fast shutdown by: 1. Setting running=False to prevent new export tasks 2. Signaling shutdown to waiting code 3. Cleaning up subscriptions and resources 4. Clearing task tracking (tasks continue in event loop)
Performance: Does not block waiting for background export tasks to complete. Background tasks will finish asynchronously and clean themselves up.
Note: This method is called when the exporter is no longer needed.
- async wait_ready()#
Wait for the exporter to be ready.
This method is called when the exporter is ready to export events.
- create_isolated_instance(
- context_state: nat.builder.context.ContextState,
Create an isolated copy with automatic descriptor-based state reset.
This method creates a shallow copy that shares expensive resources (HTTP clients, auth headers) while isolating mutable state through the IsolatedAttribute descriptor pattern.
- Args:
context_state: The isolated context state for the new instance
- Returns:
BaseExporter: Isolated instance sharing expensive resources