nat.observability.exporter_manager#

Attributes#

Classes#

ExporterManager

Manages the lifecycle of asynchronous exporters.

Module Contents#

logger#
class ExporterManager(shutdown_timeout: int = 120)#

Manages the lifecycle of asynchronous exporters.

ExporterManager maintains a registry of exporters, allowing for dynamic addition and removal. It provides methods to start and stop all registered exporters concurrently, ensuring proper synchronization and lifecycle management. The manager is designed to prevent race conditions during exporter operations and to handle exporter tasks in an asyncio event loop.

Each workflow execution gets its own ExporterManager instance to manage the lifecycle of exporters during that workflow’s execution.

Exporters added after start() is called will not be started automatically. They will only be started on the next lifecycle (i.e., after a stop and subsequent start).

Args:

shutdown_timeout (int, optional): Maximum time in seconds to wait for exporters to shut down gracefully. Defaults to 120 seconds.

Initialize the ExporterManager.

_tasks: dict[str, asyncio.Task]#
_running: bool = False#
_exporter_registry: dict[str, nat.observability.exporter.base_exporter.BaseExporter]#
_is_registry_shared: bool = False#
_lock: asyncio.Lock#
_shutdown_event: asyncio.Event#
_shutdown_timeout: int = 120#
_active_isolated_exporters: dict[str, nat.observability.exporter.base_exporter.BaseExporter]#
classmethod _create_with_shared_registry(
shutdown_timeout: int,
shared_registry: dict[str, nat.observability.exporter.base_exporter.BaseExporter],
) ExporterManager#

Internal factory method for creating instances with shared registry.

_ensure_registry_owned()#

Ensure we own the registry (copy-on-write).

add_exporter(
name: str,
exporter: nat.observability.exporter.base_exporter.BaseExporter,
) None#

Add an exporter to the manager.

Args:

name (str): The unique name for the exporter. exporter (BaseExporter): The exporter instance to add.

remove_exporter(name: str) None#

Remove an exporter from the manager.

Args:

name (str): The name of the exporter to remove.

get_exporter(
name: str,
) nat.observability.exporter.base_exporter.BaseExporter#

Get an exporter instance by name.

Args:

name (str): The name of the exporter to retrieve.

Returns:

BaseExporter: The exporter instance if found, otherwise raises a ValueError.

Raises:

ValueError: If the exporter is not found.

async get_all_exporters() dict[str, nat.observability.exporter.base_exporter.BaseExporter]#

Get all registered exporters instances.

Returns:

dict[str, BaseExporter]: A dictionary mapping exporter names to exporter instances.

create_isolated_exporters(
context_state: nat.builder.context.ContextState | None = None,
) dict[str, nat.observability.exporter.base_exporter.BaseExporter]#

Create isolated copies of all exporters for concurrent execution.

This uses copy-on-write to efficiently create isolated instances that share expensive resources but have separate mutable state.

Args:
context_state (ContextState | None, optional): The isolated context state for the new exporter instances.

If not provided, a new context state will be created.

Returns:

dict[str, BaseExporter]: Dictionary of isolated exporter instances

async _cleanup_isolated_exporters()#

Explicitly clean up isolated exporter instances.

async _cleanup_single_exporter(
name: str,
exporter: nat.observability.exporter.base_exporter.BaseExporter,
)#

Clean up a single isolated exporter.

async start(context_state: nat.builder.context.ContextState | None = None)#

Start all registered exporters concurrently.

This method acquires a lock to ensure only one start/stop cycle is active at a time. It starts all currently registered exporters in their own asyncio tasks. Exporters added after this call will not be started until the next lifecycle.

Args:

context_state: Optional context state for creating isolated exporters

Yields:

ExporterManager: The manager instance for use within the context.

Raises:

RuntimeError: If the manager is already running.

async _run_exporter(
name: str,
exporter: nat.observability.exporter.base_exporter.BaseExporter,
)#

Run an exporter in its own task.

Args:

name (str): The name of the exporter. exporter (BaseExporter): The exporter instance to run.

async stop() None#

Stop all registered exporters.

This method signals all running exporter tasks to shut down and waits for their completion, up to the configured shutdown timeout. If any tasks do not complete in time, a warning is logged.

static from_exporters(
exporters: dict[str, nat.observability.exporter.base_exporter.BaseExporter],
shutdown_timeout: int = 120,
) ExporterManager#

Create an ExporterManager from a dictionary of exporters.

get() ExporterManager#

Create a copy of this ExporterManager with the same configuration using copy-on-write.

This is the most efficient approach - shares the registry until modifications are needed.

Returns:

ExporterManager: A new ExporterManager instance with shared exporters (copy-on-write).