Adding Telemetry Exporters to NVIDIA NeMo Agent Toolkit#
Note: The code examples in this guide are pseudo code designed to illustrate the programming interface and key concepts. They focus on demonstrating the structure and flow rather than providing complete, runnable implementations. Use these examples to understand the interface patterns and adapt them to your specific use case.
Telemetry exporters are plugins that send telemetry data (e.g., traces, spans, and intermediate steps, etc.) from NeMo Agent toolkit workflows to external observability services. The NeMo Agent toolkit uses a flexible, plugin-based observability system that allows you to configure multiple exporters simultaneously and create custom integrations for any observability platform. This guide provides a comprehensive overview of how to create and register custom telemetry exporters.
Why Use Telemetry Exporters?#
Telemetry exporters solve critical observability challenges in Agentic AI workflows:
Production Monitoring#
Track workflow performance: Monitor execution times, success rates, and resource usage across your AI agents
Identify bottlenecks: Discover slow LLM calls, inefficient tool usage, or processing delays
Real-time alerting: Get notified when workflows fail or performance degrades
Debugging and Troubleshooting#
Trace execution flow: Follow the complete path of requests through your agent workflows
Debug failures: Understand exactly where and why workflows fail with detailed error context
Inspect intermediate data: See inputs, outputs, and transformations at each step
Analytics and Insights#
Usage patterns: Understand how users interact with your AI agents
Cost optimization: Track token usage, API calls, and resource consumption
Performance analysis: Identify trends and optimization opportunities
Integration and Compliance#
Enterprise observability: Connect to existing monitoring infrastructure (Datadog, etc.)
Compliance requirements: Maintain audit trails and detailed logs for regulatory compliance
Custom dashboards: Build specialized visualizations for your specific use cases
Common Use Cases#
Scenario |
Benefit |
Recommended Exporter |
---|---|---|
Development debugging |
Quick local inspection of workflow behavior |
RawExporter |
Production monitoring |
Real-time performance tracking and alerting using a span-based data structure |
SpanExporter |
Enterprise integration |
Connect to existing OpenTelemetry based observability stack |
OtelSpanExporter |
Custom analytics |
Specialized data processing and visualization |
ProcessingExporter |
Compliance auditing |
Detailed audit trails and data retention |
FileExporter |
Without telemetry exporters, you’re operating blind - unable to understand performance, debug issues, or optimize your AI workflows. With telemetry exporters, you gain complete visibility into your agent operations, enabling confident production deployment and continuous improvement.
Existing Telemetry Exporters#
To view the list of locally installed and registered telemetry exporters, run the following command:
nat info components -t tracing
Examples of existing telemetry exporters include:
File: Exports traces to local files
Phoenix: Exports traces to Arize Phoenix for visualization
Weave: Exports traces to Weights & Biases Weave
Langfuse: Exports traces to Langfuse via OTLP
LangSmith: Exports traces to LangSmith via OTLP
OpenTelemetry Collector: Exports traces to OpenTelemetry-compatible services
Patronus: Exports traces to Patronus via OTLP
Galileo: Exports traces to Galileo via OTLP
RagaAI Catalyst: Exports traces to RagaAI Catalyst
Quick Start: Your First Telemetry Exporter#
Want to get started quickly? Here’s a minimal working example that creates a console exporter to print traces to the terminal:
from pydantic import Field
from nat.builder.builder import Builder
from nat.cli.register_workflow import register_telemetry_exporter
from nat.data_models.telemetry_exporter import TelemetryExporterBaseConfig
from nat.observability.exporter.raw_exporter import RawExporter
from nat.data_models.intermediate_step import IntermediateStep
# Step 1: Define configuration
class ConsoleTelemetryExporter(TelemetryExporterBaseConfig, name="console"):
prefix: str = Field(default="[TRACE]", description="Prefix for console output")
# Step 2: Create exporter class
class ConsoleExporter(RawExporter[IntermediateStep, IntermediateStep]):
"""
RawExporter[IntermediateStep, IntermediateStep] means:
- Input: IntermediateStep (raw workflow events)
- Output: IntermediateStep (no transformation needed)
"""
def __init__(self, prefix: str = "[TRACE]", context_state=None):
super().__init__(context_state=context_state)
self.prefix = prefix
async def export_processed(self, item: IntermediateStep):
print(f"{self.prefix} {item.event_type}: {item.name}")
# IntermediateStep contains workflow events with fields like:
# - event_type: The type of event (e.g., "function_call", "llm_response")
# - name: The name of the step or component
# - metadata: Additional context and data
# Step 3: Register the exporter
@register_telemetry_exporter(config_type=ConsoleTelemetryExporter)
async def console_telemetry_exporter(config: ConsoleTelemetryExporter, builder: Builder):
yield ConsoleExporter(prefix=config.prefix)
Usage in workflow.yaml:
general:
telemetry:
tracing:
console_exporter:
_type: console
prefix: "[MY_APP]"
That’s it! Your exporter will now print trace information to the console. Let’s explore more advanced features below.
Key Concepts#
Before diving into advanced features, here are the core concepts:
Configuration Class: Defines the settings your exporter needs (endpoints, API keys, etc.) and its registered name
Exporter Class: Contains the logic to process and export trace data
Registration Function: Connects your configuration to your exporter implementation
Processing Pipeline: Optional transformations applied to data before export
Isolation: Ensures concurrent workflows don’t interfere with each other
The Three-Step Pattern:
Define what settings you need (configuration)
Implement how to export data (exporter class)
Register the exporter with the toolkit (registration function)
Understanding Telemetry Exporters#
Telemetry exporters in NeMo Agent toolkit are responsible for:
Event Subscription: Listening to workflow intermediate steps
Data Processing: Transforming raw events into the target format
Export: Sending processed data to target destinations
Lifecycle Management: Handling startup, shutdown, and error conditions
Telemetry Data Flow#
The flexible telemetry export system routes workflow events through different exporter types to various destinations:
Exporter Types#
NeMo Agent toolkit supports several types of exporters based on the data they handle:
File, Console, Custom"] C --> F["Span Processing
Weave, HTTP APIs, Databases"] D --> G["OTLP Processing
Datadog, Phoenix, Otel Collectors"] style A fill:#e3f2fd style B fill:#fff3e0 style C fill:#f3e5f5 style D fill:#e8f5e8 style E fill:#fff3e0 style F fill:#f3e5f5 style G fill:#e8f5e8
Choosing the Right Exporter Type#
The following table helps you choose the appropriate exporter type for your use case:
Exporter Type |
Use When |
Best For |
Complexity |
Development Time |
---|---|---|---|---|
Raw Exporter |
Simple file/console output |
Local development |
Low |
30 minutes |
Span Exporter |
HTTP API integration |
Production HTTP APIs |
Medium |
2-4 hours |
OpenTelemetry Exporter |
OTLP-compatible services |
Jaeger, Tempo |
Low |
15-30 minutes |
Advanced Custom Exporter |
Complex business logic |
Enterprise reliability patterns |
High |
1-2 days |
Quick Decision Guide:
Using standard observability tools? → Use pre-built OpenTelemetry exporters (Langfuse, LangSmith, etc.)
Just getting started? → Use Raw Exporter with console or file output
Integrating with custom HTTP API? → Use Span Exporter
Need custom OTLP service? → Create simple config wrapper around
OTLPSpanAdapterExporter
Need complex business logic with state tracking? → Advanced Custom Exporter with custom processors
Raw Exporters#
Process raw IntermediateStep
events directly:
Use case: Simple file logging, custom event processing
Base class:
RawExporter
Data flow:
IntermediateStep
→ [Processing Pipeline] →OutputT
→ Export
Span Exporters#
Convert events into spans with lifecycle management:
Use case: Distributed tracing, span-based observability
Base class:
SpanExporter
Data flow:
IntermediateStep
→Span
→ [Processing Pipeline] →OutputT
→ Export
OpenTelemetry Exporters#
Specialized for OpenTelemetry-compatible services with many pre-built options:
Use case: OTLP-compatible backends, standard observability tools
Base class:
OtelSpanExporter
Data flow:
IntermediateStep
→Span
→ [Processing Pipeline] →OtelSpan
→ ExportPre-built integrations: Langfuse, LangSmith, OpenTelemetry Collector, Patronus, Galileo, Phoenix, RagaAI, Weave
Advanced Custom Exporters#
Advanced exporters for complex analytics pipelines with state management:
Use case: Complex business logic, stateful data processing, multi-system integrations
Base class:
ProcessingExporter
with custom processors and advanced featuresData flow:
IntermediateStep
→InputT
→ [Enrichment Pipeline] →OutputT
→ ExportKey features: Circuit breakers, dead letter queues, state tracking, custom transformations, performance monitoring
Note: This is a high-complexity pattern. See the Advanced Custom Exporters section in Advanced Features for detailed implementation examples.
Note: All exporters support optional processing pipelines that can transform, filter, batch, or aggregate data before export. Common processors include batching for efficient transmission, filtering for selective export, and format conversion for compatibility with different backends.
Pre-Built Telemetry Exporters#
Before creating a custom exporter, check if your observability service is already supported:
Available Integrations#
Service |
Type |
Installation |
Configuration |
---|---|---|---|
File |
|
|
local file or directory |
Langfuse |
|
|
endpoint + API keys |
LangSmith |
|
|
endpoint + API key |
OpenTelemetry Collector |
|
|
endpoint + headers |
Patronus |
|
|
endpoint + API key |
Galileo |
|
|
endpoint + API key |
Phoenix |
|
|
endpoint |
RagaAI/Catalyst |
|
|
API key + project |
Weave |
|
|
project name |
Simple Configuration Example#
# workflow.yaml
general:
telemetry:
tracing:
langfuse:
_type: langfuse
endpoint: https://cloud.langfuse.com/api/public/otel/v1/traces
public_key: ${LANGFUSE_PUBLIC_KEY}
secret_key: ${LANGFUSE_SECRET_KEY}
Most services use OTLP: If your service supports OpenTelemetry Protocol (OTLP), you can often subclass
OtelSpanExporter
or use the genericotelcollector
type with appropriate headers.
Creating a Custom Telemetry Exporter#
This section provides detailed guidance for creating production-ready telemetry exporters. If you just want to get started quickly, see the Quick Start section first.
Step 1: Define the Configuration Class#
Create a configuration class that inherits from TelemetryExporterBaseConfig
:
from pydantic import Field
from nat.data_models.telemetry_exporter import TelemetryExporterBaseConfig
class CustomTelemetryExporter(TelemetryExporterBaseConfig, name="custom"):
"""A simple custom telemetry exporter for sending traces to a custom service."""
# Required fields
endpoint: str = Field(description="The endpoint URL for the custom service")
api_key: str = Field(description="API key for authentication")
Tip: Start with the fields you need and add more as your integration becomes more sophisticated. See the Common Integration Patterns section for practical examples.
Step 2: Implement the Exporter Class#
Choose the appropriate base class based on your needs:
Raw Exporter (for simple trace exports)#
from nat.observability.exporter.raw_exporter import RawExporter
from nat.data_models.intermediate_step import IntermediateStep
class CustomRawExporter(RawExporter[IntermediateStep, IntermediateStep]):
"""A custom raw exporter that processes intermediate steps directly."""
def __init__(self, endpoint: str, api_key: str, project: str, **kwargs):
super().__init__(**kwargs)
# Store configuration
self.endpoint = endpoint
self.api_key = api_key
self.project = project
async def export_processed(self, item: IntermediateStep):
"""Export the intermediate step to the custom service."""
# Transform and send data
payload = {
"project": self.project,
"event_type": item.event_type,
"name": item.payload.name if item.payload else None,
"timestamp": item.event_timestamp
}
# Send to your service (implement _send_to_service method)
await self._send_to_service(payload)
async def _cleanup(self):
"""Clean up resources when the exporter is stopped."""
# Clean up HTTP sessions, file handles, etc.
await super()._cleanup()
Span Exporter (for span-based tracing)#
from nat.data_models.span import Span
from nat.observability.exporter.span_exporter import SpanExporter
from nat.observability.processor.processor import Processor
class SpanToDictProcessor(Processor[Span, dict]):
"""Processor that transforms Span objects to dictionaries."""
async def process(self, item: Span) -> dict:
"""Transform a Span object to a dictionary."""
return {
"span_id": item.context.span_id if item.context else None,
"trace_id": item.context.trace_id if item.context else None,
"parent_span_id": item.context.parent_span_id if item.context else None,
"name": item.name,
"start_time": item.start_time,
"end_time": item.end_time,
"duration": item.duration,
"status": item.status,
"attributes": item.attributes,
"events": item.events,
"links": item.links
}
class CustomSpanExporter(SpanExporter[Span, dict]):
"""A custom span exporter that sends spans to a custom service."""
def __init__(self, endpoint: str, api_key: str, project: str, **kwargs):
super().__init__(**kwargs)
# Store configuration and initialize resources
self.endpoint = endpoint
self.api_key = api_key
self.project = project
# Add the processor to transform Span to dict
self.add_processor(SpanToDictProcessor())
async def export_processed(self, item: dict):
"""Export the processed span to the custom service."""
# The item is now a dict thanks to SpanToDictProcessor
payload = {
"project": self.project,
"span": item
}
# Send to your service
await self._send_to_service(payload)
async def _cleanup(self):
"""Clean up resources when the exporter is stopped."""
# Clean up HTTP sessions, file handles, etc.
await super()._cleanup()
OpenTelemetry Exporter (for OTLP compatibility)#
Note: OpenTelemetry exporters require the
nvidia-nat-opentelemetry
subpackage. Install it with:
pip install nvidia-nat[opentelemetry]
For most OTLP-compatible services, use the pre-built OTLPSpanAdapterExporter
:
from nat.plugins.opentelemetry.otlp_span_adapter_exporter import OTLPSpanAdapterExporter
# See Pattern 3 in Common Integration Patterns for full example
Tip: For complete implementation examples with HTTP sessions, error handling, and cleanup, see the Common Integration Patterns section. Warning: Always implement
_cleanup()
and callawait super()._cleanup()
to prevent resource leaks. Failure to properly clean up HTTP sessions, file handles, or database connections can cause memory leaks and connection pool exhaustion in production environments.
Step 3: Register the Exporter#
Create a registration function using the @register_telemetry_exporter
decorator:
import logging
from nat.builder.builder import Builder
from nat.cli.register_workflow import register_telemetry_exporter
logger = logging.getLogger(__name__)
@register_telemetry_exporter(config_type=CustomTelemetryExporter)
async def custom_telemetry_exporter(config: CustomTelemetryExporter, builder: Builder):
"""Create a custom telemetry exporter."""
try:
# Initialize the exporter with configuration
exporter = CustomSpanExporter(
endpoint=config.endpoint,
api_key=config.api_key,
project=config.project,
batch_size=config.batch_size,
timeout=config.timeout,
retries=config.retries
)
# Yield the exporter (async context manager pattern)
yield exporter
except Exception as ex:
logger.error(f"Failed to create custom telemetry exporter: {ex}", exc_info=True)
raise
Important: For plugin-specific imports (like
aiohttp
, OpenTelemetry modules, or other external dependencies), always import them inside the registration function to enable lazy loading. This prevents long startup times when these plugins aren’t needed.
Best Practices for Code Organization#
In production code, structure your telemetry exporter as follows:
my_plugin/exporters.py
:
import aiohttp
from nat.data_models.span import Span
from nat.observability.exporter.span_exporter import SpanExporter
class MyCustomExporter(SpanExporter[Span, dict]):
"""Custom exporter implementation."""
def __init__(self, endpoint: str, api_key: str, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
self.api_key = api_key
self.session = aiohttp.ClientSession()
async def export_processed(self, item: dict):
# Implementation here
pass
async def _cleanup(self):
"""Clean up resources when the exporter is stopped."""
# Clean up HTTP sessions, file handles, etc.
await super()._cleanup()
my_plugin/register.py
:
from pydantic import Field
from nat.cli.register_workflow import register_telemetry_exporter
from nat.data_models.telemetry_exporter import TelemetryExporterBaseConfig
from nat.builder.builder import Builder
# Configuration class can be in the same file as registration
class MyTelemetryExporter(TelemetryExporterBaseConfig, name="my_exporter"):
endpoint: str = Field(description="Service endpoint URL")
api_key: str = Field(description="API key for authentication")
@register_telemetry_exporter(config_type=MyTelemetryExporter)
async def my_telemetry_exporter(config: MyTelemetryExporter, builder: Builder):
# Import only when the exporter is actually used
from .exporters import MyCustomExporter
yield MyCustomExporter(
endpoint=config.endpoint,
api_key=config.api_key
)
Why this pattern?
Lazy loading: Plugin dependencies are only loaded when the exporter is used
Clean separation: Business logic is separate from registration
Maintainability: Classes are easier to test and modify when properly organized
Performance: Avoids importing heavy dependencies during application startup
Note: Configuration classes are lightweight and can be defined in the same file as registration functions. The separation is primarily for exporter implementation classes that have heavy dependencies.
Note: For OpenTelemetry exporters with custom protocols, see the Advanced Features section for mixin patterns and complex integrations.
Step 4: Add Processing Pipeline (Optional)#
If your exporter needs to transform data before export, add processors to the pipeline. This is especially important when using SpanExporter[Span, dict]
to convert Span
objects to dictionaries:
from nat.data_models.span import Span
from nat.observability.processor.processor import Processor
class SpanToDictProcessor(Processor[Span, dict]):
"""Processor that transforms Span objects to dictionaries."""
async def process(self, item: Span) -> dict:
"""Transform a Span object to a dictionary."""
return {
"span_id": item.context.span_id if item.context else None,
"trace_id": item.context.trace_id if item.context else None,
"parent_span_id": item.context.parent_span_id if item.context else None,
"name": item.name,
"start_time": item.start_time,
"end_time": item.end_time,
"duration": item.duration,
"status": item.status,
"attributes": item.attributes,
"events": item.events
}
class CustomFieldProcessor(Processor[dict, dict]):
"""Processor that adds custom fields to the data."""
async def process(self, item: dict) -> dict:
"""Add custom fields to the dictionary."""
return {
**item,
"custom_field": self._extract_custom_data(item),
"processed_at": self._get_current_timestamp()
}
def _extract_custom_data(self, item):
"""Extract custom data from the item."""
# Add custom transformation logic
return item.get("attributes", {}).get("custom", {})
def _get_current_timestamp(self):
"""Get current timestamp."""
from datetime import datetime
return datetime.utcnow().isoformat()
# Add processors to your exporter
class CustomSpanExporter(SpanExporter[Span, dict]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Add processors to the pipeline (they run in order)
self.add_processor(SpanToDictProcessor()) # First: Span -> dict
self.add_processor(CustomFieldProcessor()) # Second: add custom fields
Common processor patterns:
Span to dict transformation: Convert
Span
objects to dictionariesField filtering: Remove sensitive or unnecessary fields
Field transformation: Convert timestamps, normalize data formats
Custom enrichment: Add metadata, context, or computed fields
Step 5: Configure in Workflow#
Once registered, configure your telemetry exporter in your workflow configuration. The flexible observability system allows you to configure multiple exporters simultaneously by adding them to the tracing
section:
# workflow.yaml
general:
telemetry:
tracing:
# Your custom exporter
custom_exporter:
_type: custom
endpoint: https://api.custom-service.com/traces
api_key: ${CUSTOM_API_KEY}
# Multiple exporters can be configured simultaneously
phoenix_local:
_type: phoenix
endpoint: http://localhost:6006/v1/traces
project: my-project
Next Steps: You now have a complete custom telemetry exporter! For real-world implementation examples, see the Common Integration Patterns section. For advanced features like concurrent execution and performance optimization, see the Advanced Features section.
Common Integration Patterns#
These patterns show example exporter implementations. When implementing these in your own registration functions, remember to move plugin-specific imports (like aiohttp
, OpenTelemetry modules) inside the registration function for lazy loading.
Pattern 1: HTTP API with Authentication#
Most observability services use HTTP APIs with token authentication:
import aiohttp
from nat.data_models.span import Span
from nat.observability.exporter.span_exporter import SpanExporter
from nat.observability.processor.processor import Processor
class SpanToDictProcessor(Processor[Span, dict]):
"""Processor that transforms Span objects to dictionaries."""
async def process(self, item: Span) -> dict:
"""Transform a Span object to a dictionary."""
return {
"span_id": item.context.span_id if item.context else None,
"trace_id": item.context.trace_id if item.context else None,
"name": item.name,
"start_time": item.start_time,
"end_time": item.end_time,
"attributes": item.attributes
}
class HTTPServiceExporter(SpanExporter[Span, dict]):
def __init__(self, endpoint: str, api_key: str, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
self.headers = {"Authorization": f"Bearer {api_key}"}
self.session = aiohttp.ClientSession()
# Add processor to transform Span to dict
self.add_processor(SpanToDictProcessor())
async def export_processed(self, item: dict):
# item is now a dict thanks to SpanToDictProcessor
async with self.session.post(
self.endpoint,
json=item,
headers=self.headers
) as response:
response.raise_for_status()
async def _cleanup(self):
"""Clean up HTTP session."""
await self.session.close()
await super()._cleanup()
Pattern 2: File-based Export#
For local development and debugging:
import asyncio
import aiofiles
from nat.observability.exporter.raw_exporter import RawExporter
from nat.observability.processor.intermediate_step_serializer import IntermediateStepSerializer
class FileExporter(RawExporter[IntermediateStep, str]):
def __init__(self, filepath: str, **kwargs):
super().__init__(**kwargs)
self.filepath = filepath
self.lock = asyncio.Lock()
self.add_processor(IntermediateStepSerializer())
async def export_processed(self, item: str):
async with self._lock:
async with aiofiles.open(self._current_file_path, mode="a") as f:
f.write(item + '\n')
Pattern 3: Quick OpenTelemetry Integration#
For standard OTLP services, use the pre-built adapter:
@register_telemetry_exporter(config_type=MyTelemetryExporter)
async def my_telemetry_exporter(config: MyTelemetryExporter, builder: Builder):
# Import inside the function for lazy loading
from nat.plugins.opentelemetry.otlp_span_adapter_exporter import OTLPSpanAdapterExporter
yield OTLPSpanAdapterExporter(
endpoint=config.endpoint,
headers={"Authorization": f"Bearer {config.api_key}"},
batch_size=config.batch_size
)
Summary: You now have three proven patterns for telemetry integration:
Pattern 1 (HTTP API): Most common for cloud services and APIs
Pattern 2 (File Export): Perfect for development and debugging
Pattern 3 (OTLP): Use when your service supports OpenTelemetry standards
For basic integrations, these patterns cover 90% of use cases. Continue to Advanced Features only if you need concurrent execution, high-performance batching, or advanced error handling.
Advanced Features#
This section covers advanced topics for production-ready telemetry exporters. Choose the sections relevant to your use case:
Concurrent Execution: Required for multi-user or multi-workflow applications
Custom OpenTelemetry Protocols: Advanced OpenTelemetry integration patterns
Performance Optimization: Batching, connection management, and efficiency
Reliability: Error handling, retries, and resilience
Advanced Custom Exporters: State-aware processing, data warehouses, and complex pipelines
Concurrent Execution#
Isolated Attributes for Concurrent Execution#
Note: If you’re only running one workflow at a time, you can skip this section. However, if your application runs multiple concurrent workflows or serves multiple users simultaneously, proper isolation is critical to prevent data corruption and race conditions.
When multiple workflows run simultaneously, each needs its own isolated exporter state. NeMo Agent toolkit provides IsolatedAttribute
to handle this automatically.
The Problem#
Without isolation, concurrent workflows would share the same exporter instance, leading to:
Mixed-up trace data between workflows
Race conditions in processing queues
Incorrect metrics and task tracking
The Solution: IsolatedAttribute#
IsolatedAttribute
creates separate state for each workflow while sharing expensive resources:
from nat.data_models.span import Span
from nat.observability.exporter.base_exporter import IsolatedAttribute
from nat.observability.exporter.span_exporter import SpanExporter
class MyExporter(SpanExporter[Span, dict]):
# Isolated mutable state per workflow (safe)
_processing_queue: IsolatedAttribute[deque] = IsolatedAttribute(deque)
_metrics: IsolatedAttribute[dict] = IsolatedAttribute(dict)
def __init__(self, endpoint: str, api_key: str, **kwargs):
super().__init__(**kwargs)
# Instance-level resources - each exporter gets its own
self.endpoint = endpoint
self.session = aiohttp.ClientSession()
self.headers = {"Authorization": f"Bearer {api_key}"}
Built-in Usage: The base exporter classes already use IsolatedAttribute
for core functionality:
BaseExporter
uses it for_tasks
,_ready_event
, and_shutdown_event
SpanExporter
uses it for_outstanding_spans
,_span_stack
, and_metadata_stack
This ensures that each isolated instance has its own task tracking and span lifecycle management.
Usage in Exporters#
import uuid
import aiohttp
from collections import deque
from nat.data_models.span import Span
from nat.observability.exporter.base_exporter import IsolatedAttribute
from nat.observability.exporter.span_exporter import SpanExporter
class MyCustomExporter(SpanExporter[Span, dict]):
"""Custom exporter with isolated state management."""
# Isolated mutable state per workflow (safe)
_processing_queue: IsolatedAttribute[deque] = IsolatedAttribute(deque)
_active_requests: IsolatedAttribute[set] = IsolatedAttribute(set)
_export_metrics: IsolatedAttribute[dict] = IsolatedAttribute(dict)
def __init__(self, endpoint: str, api_key: str, **kwargs):
super().__init__(**kwargs)
# Store configuration as instance variables
self.endpoint = endpoint
self.api_key = api_key
# Create HTTP client and headers per instance
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100),
timeout=aiohttp.ClientTimeout(total=30)
)
self.headers = {"Authorization": f"Bearer {api_key}"}
async def export_processed(self, item: dict):
"""Export with isolated state tracking."""
# Use isolated attributes for mutable state
self._processing_queue.append(item)
request_id = str(uuid.uuid4())
self._active_requests.add(request_id)
try:
# Use instance HTTP client and headers
async with self.session.post(
self.endpoint,
json=item,
headers=self.headers
) as response:
if response.status == 200:
self._export_metrics['success'] = self._export_metrics.get('success', 0) + 1
else:
self._export_metrics['failure'] = self._export_metrics.get('failure', 0) + 1
finally:
self._active_requests.discard(request_id)
if self._processing_queue:
self._processing_queue.popleft()
async def _cleanup(self):
"""Clean up HTTP session."""
await self.session.close()
await super()._cleanup()
How Isolation Works#
When create_isolated_instance()
is called, the IsolatedAttribute
descriptor automatically:
Shares expensive resources: HTTP clients, authentication headers, etc.
Isolates mutable state: Each instance gets its own queue, metrics, tracking sets
Maintains thread safety: No locks needed for concurrent access
# Original exporter
exporter1 = MyCustomExporter("https://api.service1.com")
exporter1._processing_queue.append("item1")
exporter1._export_metrics['success'] = 5
# Create isolated instance
context_state = ContextState.get()
exporter2 = exporter1.create_isolated_instance(context_state)
# Isolated state - each has independent data
assert len(exporter1._processing_queue) == 1 # Has "item1"
assert len(exporter2._processing_queue) == 0 # Empty queue
assert exporter1._export_metrics['success'] == 5 # Original metrics
assert len(exporter2._export_metrics) == 0 # Fresh metrics
# Shared resources - same HTTP session
assert exporter1.session is exporter2.session # Same session
Best Practices for IsolatedAttribute#
Use IsolatedAttribute for:
Task tracking sets
Processing queues
Metrics dictionaries
Event tracking state
Temporary buffers
Request counters
Don’t use IsolatedAttribute for:
HTTP clients (expensive to create)
Authentication tokens
Configuration settings
Database connections
Logger instances
Example with Common Patterns:
from collections import deque
import aiohttp
from nat.data_models.span import Span
from nat.observability.exporter.base_exporter import IsolatedAttribute
from nat.observability.exporter.span_exporter import SpanExporter
class BatchingExporter(SpanExporter[Span, dict]):
"""Exporter demonstrating common IsolatedAttribute patterns."""
# Isolated mutable state per workflow (safe)
_batch_queue: IsolatedAttribute[deque] = IsolatedAttribute(deque)
_flush_timer: IsolatedAttribute[dict] = IsolatedAttribute(dict)
_statistics: IsolatedAttribute[dict] = IsolatedAttribute(
lambda: {"batches_sent": 0, "items_processed": 0, "errors": 0}
)
def __init__(self, batch_size: int = 100, endpoint: str = "https://your-service.com/api/spans", **kwargs):
super().__init__(**kwargs)
self.batch_size = batch_size
self.endpoint = endpoint
# Define headers once during initialization
self.headers = {
"Content-Type": "application/json"
}
# Create HTTP session once and reuse it
import aiohttp
self.session = aiohttp.ClientSession()
async def export_processed(self, item: dict):
"""Export with batching and isolated state."""
# Add to isolated batch queue
self._batch_queue.append(item)
self._statistics['items_processed'] += 1
# Flush if batch is full
if len(self._batch_queue) >= self.batch_size:
await self._flush_batch()
async def _flush_batch(self):
"""Flush batch with isolated state management."""
if not self._batch_queue:
return
# Create batch from isolated queue
batch = list(self._batch_queue)
self._batch_queue.clear()
try:
# Send batch directly with proper error handling
await self._send_batch(batch)
self._statistics['batches_sent'] += 1
except Exception as e:
self._statistics['errors'] += 1
# In production, you might want to retry or use a dead letter queue
raise
async def _send_batch(self, batch: list[dict]):
"""Send batch to the service."""
payload = {"spans": batch}
# Use the reusable session and headers
async with self.session.post(
self.endpoint,
json=payload,
headers=self.headers
) as response:
response.raise_for_status()
async def _cleanup(self):
"""Clean up HTTP session."""
if hasattr(self, 'session') and self.session:
await self.session.close()
await super()._cleanup()
Custom OpenTelemetry Protocols#
Use Case: When you need to integrate with an OpenTelemetry-compatible service that requires custom authentication, headers, or data transformation.
For OpenTelemetry exporters with custom protocols, create a simple mixin that handles authentication and HTTP transport:
# In production, define these classes in a separate module (e.g., exporters.py)
import aiohttp
from nat.plugins.opentelemetry.otel_span import OtelSpan
class CustomProtocolMixin:
"""Simple mixin for custom authentication and HTTP transport."""
def __init__(self, *args, endpoint: str, api_key: str, **kwargs):
"""Initialize the custom protocol mixin."""
self.endpoint = endpoint
self.api_key = api_key
# Define headers once during initialization
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
self.session = aiohttp.ClientSession()
super().__init__(*args, **kwargs)
async def export_otel_spans(self, spans: list[OtelSpan]):
"""Export spans using the custom protocol."""
# Simple payload - send spans with minimal wrapping
payload = {
"spans": [
{
"name": span.name,
"span_id": span.get_span_context().span_id,
"trace_id": span.get_span_context().trace_id,
"start_time": span.start_time,
"end_time": span.end_time,
"attributes": dict(span.attributes) if span.attributes else {}
}
for span in spans
]
}
# Send to service with custom headers
async with self.session.post(
self.endpoint,
json=payload,
headers=self.headers
) as response:
response.raise_for_status()
async def _cleanup(self):
"""Clean up HTTP session."""
await self.session.close()
await super()._cleanup()
# In production, you would define this in a separate module and import OtelSpanExporter there
# For example: from nat.plugins.opentelemetry.otel_span_exporter import OtelSpanExporter
# class CustomServiceExporter(CustomProtocolMixin, OtelSpanExporter):
# """Simple exporter combining custom protocol with OpenTelemetry span processing."""
# def __init__(self, endpoint: str, api_key: str, **kwargs):
# super().__init__(endpoint=endpoint, api_key=api_key, **kwargs)
@register_telemetry_exporter(config_type=CustomTelemetryExporter)
async def custom_telemetry_exporter(config: CustomTelemetryExporter, builder: Builder):
"""Create a custom telemetry exporter using the mixin pattern."""
# In production, import your exporter classes from a separate module:
# from .exporters import CustomServiceExporter
# For this example, we'll create a simple combined class here
from nat.plugins.opentelemetry.otel_span_exporter import OtelSpanExporter
class CustomServiceExporter(CustomProtocolMixin, OtelSpanExporter):
"""Simple exporter combining custom protocol with OpenTelemetry span processing."""
def __init__(self, endpoint: str, api_key: str, **kwargs):
super().__init__(endpoint=endpoint, api_key=api_key, **kwargs)
yield CustomServiceExporter(
endpoint=config.endpoint,
api_key=config.api_key
)
For Complex Transformations: This example shows basic field mapping. If you need complex data transformations, filtering, or enrichment, consider using dedicated Processor classes instead of inline transformations. Processors are reusable, testable, and can be chained for complex pipelines.
Performance Optimization#
Batching Support#
Use Case: High-throughput applications generating hundreds or thousands of traces per second.
Conceptual Flow:
1. Configure BatchingProcessor with size/time limits
2. Add processor to exporter pipeline
3. Handle both individual items and batches in export_processed()
4. Transform data to target format
5. Send HTTP request with batched payload
Implementation Pattern:
class BatchingExporter(RawExporter[IntermediateStep, IntermediateStep]):
def __init__(self, endpoint, api_key, batch_size=100, flush_interval=5.0):
super().__init__()
# Store connection details
self.endpoint = endpoint
self.session = aiohttp.ClientSession()
self.headers = {"Authorization": f"Bearer {api_key}"}
# Add batching with size and time triggers
self.add_processor(BatchingProcessor[IntermediateStep](
batch_size=batch_size,
flush_interval=flush_interval
))
async def export_processed(self, item: IntermediateStep | list[IntermediateStep]):
# Handle both single items and batches from processor
items = item if isinstance(item, list) else [item]
await self._send_batch(items)
async def _send_batch(self, items: list[IntermediateStep]):
# Transform to target format
payload = {"events": [self._transform_item(item) for item in items]}
# Send to service
async with self.session.post(self.endpoint, json=payload, headers=self.headers) as response:
response.raise_for_status()
Key Features of BatchingProcessor:
Size-based batching: Flushes when
batch_size
items are accumulatedTime-based batching: Flushes after
flush_interval
secondsAuto-wired callbacks: Callbacks automatically set up when added to exporter
Shutdown safety: Processes all queued items during cleanup
Overflow handling: Configurable drop behavior when queue is full
Statistics: Built-in metrics for monitoring performance
Configuration Options:
BatchingProcessor[T](
batch_size=100, # Items per batch
flush_interval=5.0, # Seconds between flushes
max_queue_size=1000, # Maximum queue size
drop_on_overflow=False, # Drop items vs. force flush
shutdown_timeout=10.0 # Shutdown timeout
)
Reliability#
Error Handling and Retries#
Use Case: Production environments where network issues or service outages are common.
Implement robust error handling:
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientExporter(SpanExporter[Span, dict]):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def export_processed(self, item: dict):
"""Export with retry logic."""
try:
await self._export_to_service(item)
except Exception as ex:
logger.warning(f"Export failed, retrying: {ex}")
raise
Connection Management#
Use Case: Long-running services that need optimized connection pooling and lifecycle management.
Conceptual Flow:
1. Override start() method with async context manager
2. Configure connection pool settings (limits, timeouts, DNS cache)
3. Create HTTP session with optimized settings
4. Assign session to instance for use in export_processed()
5. Automatically clean up session when exporter stops
Implementation Pattern:
class ConnectionManagedExporter(SpanExporter[Span, dict]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.session = None
@asynccontextmanager
async def start(self):
# Configure connection pool
connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
timeout = aiohttp.ClientTimeout(total=30)
# Create managed session
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
self.session = session
async with super().start():
yield # Session automatically closed when context exits
Advanced Custom Exporters#
Advanced Custom Exporters are for complex scenarios that require enterprise-grade patterns like circuit breakers, dead letter queues, stateful processing, and multi-backend coordination.
For most use cases, the simpler OpenTelemetry, Span, or Raw exporter patterns are sufficient and recommended. Consider this complexity level only when you have specific enterprise requirements that cannot be met with standard patterns.
Testing Your Exporter#
Create tests for your exporter:
import pytest
from unittest.mock import AsyncMock, patch
from nat.data_models.intermediate_step import IntermediateStep
@pytest.fixture
def custom_exporter():
return CustomSpanExporter(
endpoint="https://test.example.com",
api_key="test-key",
project="test-project"
)
@pytest.mark.asyncio
async def test_export_processed(custom_exporter):
"""Test that export_processed sends data correctly."""
with patch.object(custom_exporter, '_send_to_service', new_callable=AsyncMock) as mock_send:
test_item = {"span_id": "123", "name": "test_span"}
await custom_exporter.export_processed(test_item)
mock_send.assert_called_once()
sent_data = mock_send.call_args[0][0]
assert sent_data["project"] == "test-project"
assert sent_data["span_id"] == "123"
def test_isolated_attributes():
"""Test that isolated attributes work correctly across instances."""
from nat.builder.context import ContextState
# Create original exporter
exporter1 = CustomSpanExporter(
endpoint="https://test.example.com",
api_key="test-key",
project="test-project"
)
# Add data to first exporter's isolated attributes
exporter1._processing_queue.append("item1")
exporter1._active_requests.add("request1")
exporter1._export_metrics["success"] = 5
# Create isolated instance
context_state = ContextState.get()
exporter2 = exporter1.create_isolated_instance(context_state)
# Add different data to second exporter
exporter2._processing_queue.append("item2")
exporter2._active_requests.add("request2")
exporter2._export_metrics["failure"] = 3
# Test isolation - each exporter has its own state
assert len(exporter1._processing_queue) == 1
assert "item1" in exporter1._processing_queue
assert "item2" not in exporter1._processing_queue
assert len(exporter2._processing_queue) == 1
assert "item2" in exporter2._processing_queue
assert "item1" not in exporter2._processing_queue
# Test independent metrics
assert exporter1._export_metrics["success"] == 5
assert "failure" not in exporter1._export_metrics
assert exporter2._export_metrics["failure"] == 3
assert "success" not in exporter2._export_metrics
# Test request tracking isolation
assert "request1" in exporter1._active_requests
assert "request2" not in exporter1._active_requests
assert "request2" in exporter2._active_requests
assert "request1" not in exporter2._active_requests
Best Practices#
Performance Considerations#
Use async operations for all I/O
Implement batching for high-throughput scenarios
Use connection pooling for HTTP requests
Consider memory usage with large batches
Use
IsolatedAttribute
for mutable state in concurrent executionCall
create_isolated_instance()
when running multiple workflows concurrentlyShare expensive resources (HTTP clients, auth) across isolated instances
Error Handling#
Implement retry logic with exponential backoff
Log errors appropriately without exposing sensitive data
Gracefully handle service unavailability
Provide meaningful error messages
Resource Management#
Always implement
_cleanup()
: Override this method to clean up resources like HTTP sessions, file handles, database connectionsCall parent cleanup: Always call
await super()._cleanup()
in your overrideAutomatic lifecycle: The base class calls
_cleanup()
during shutdown - no manual calls neededHandle cleanup errors: Wrap cleanup operations in try/except blocks to prevent shutdown failures
Security#
Warning: Telemetry data may contain sensitive information from workflow executions. Never log API keys, credentials, or PII in trace data. Always use environment variables for secrets and validate/sanitize data before transmission.
Never log sensitive data like API keys
Use environment variables for credentials
Implement proper authentication
Validate input data
Monitoring#
Include metrics for export success/failure rates
Monitor batch sizes and processing times
Add health checks for external services
Log important events for debugging
Troubleshooting#
Common Issues#
Exporter not found: Ensure your exporter is properly registered and the module is imported.
Connection errors: Check endpoint URLs, authentication, and network connectivity.
Data format issues: Verify that your data transformation matches the expected format.
Performance problems: Review batching settings and connection pool configurations.
Concurrent execution issues: Ensure mutable state uses IsolatedAttribute
and expensive resources are shared properly.
Debug Mode#
Enable debug logging to troubleshoot issues:
import logging
logging.getLogger("nat.observability").setLevel(logging.DEBUG)
FAQ#
Q: Which exporter type should I use?
Raw Exporter: For simple file/console output or custom processing
Span Exporter: For HTTP APIs and services that don’t support OTLP but require a span-based trace
OpenTelemetry Exporter: For OTLP-compatible services (recommended for new integrations)
Q: How do I handle authentication?
Use environment variables for credentials:
api_key: str = Field(default="", description="API key from MYSERVICE_API_KEY")
Environment variables can be configured directly in the workflow YAML configuration file through Environment Variable Interpolation
Check environment variables in registration:
api_key = config.api_key or os.environ.get("MYSERVICE_API_KEY")
Q: My exporter isn’t receiving events. What’s wrong?
Verify the exporter is registered and imported
Check your workflow configuration file syntax
Enable debug logging to see registration messages
Ensure the exporter type name matches your configuration
Q: How do I test my exporter?
Start with the console exporter pattern from Quick Start
Use the file exporter pattern to write traces to a local file
Test with a simple workflow before integrating with external services
Complete Example#
Implementation Overview:
1. Define Configuration Schema (TelemetryExporterBaseConfig)
- Endpoint, API key, project settings
- Use pydantic Field() for validation and description
2. Create Exporter Class (SpanExporter)
- Initialize HTTP session and headers in __init__
- Use IsolatedAttribute for concurrent state management
- Implement export_processed() with error handling
- Implement _cleanup() for resource management
3. Register with NAT (register_telemetry_exporter decorator)
- Create async factory function
- Instantiate exporter with config values
- Yield exporter instance
Here’s a complete example of a custom telemetry exporter:
import logging
from pydantic import Field
import aiohttp
from nat.builder.builder import Builder
from nat.cli.register_workflow import register_telemetry_exporter
from nat.data_models.telemetry_exporter import TelemetryExporterBaseConfig
from nat.observability.exporter.span_exporter import SpanExporter
from nat.observability.exporter.base_exporter import IsolatedAttribute
from nat.data_models.span import Span
logger = logging.getLogger(__name__)
# Configuration
class ExampleTelemetryExporter(TelemetryExporterBaseConfig, name="example"):
endpoint: str = Field(description="Service endpoint")
api_key: str = Field(description="API key")
project: str = Field(description="Project name")
# Exporter implementation (in production, define this in a separate module)
class ExampleSpanExporter(SpanExporter[Span, dict]):
# Isolated mutable state
_request_counter: IsolatedAttribute[dict] = IsolatedAttribute(
lambda: {"sent": 0, "failed": 0}
)
def __init__(self, endpoint: str, api_key: str, project: str, context_state=None):
super().__init__(context_state=context_state)
self.endpoint = endpoint
self.api_key = api_key
self.project = project
# HTTP client as instance variable - shared via shallow copy for isolated instances
# Import here to avoid loading aiohttp unless this exporter is used
self.session = aiohttp.ClientSession()
self.headers = {"Authorization": f"Bearer {self.api_key}"}
async def export_processed(self, item: dict):
payload = {"project": self.project, "span": item}
try:
async with self.session.post(
self.endpoint,
json=payload,
headers=self.headers
) as response:
if response.status == 200:
self._request_counter["sent"] += 1
else:
self._request_counter["failed"] += 1
logger.error(f"Export failed: {response.status}")
except Exception as e:
self._request_counter["failed"] += 1
logger.error(f"Export error: {e}")
async def _cleanup(self):
"""Clean up shared resources."""
await self.session.close()
await super()._cleanup()
# Registration
@register_telemetry_exporter(config_type=ExampleTelemetryExporter)
async def example_telemetry_exporter(config: ExampleTelemetryExporter, builder: Builder):
# In production, import your exporter class from a separate module:
# from .exporters import ExampleSpanExporter
exporter = ExampleSpanExporter(
endpoint=config.endpoint,
api_key=config.api_key,
project=config.project
)
yield exporter
For additional reference examples, refer to the existing exporter implementations in the toolkit source code.
Next Steps#
Explore Examples: Check the
examples/observability
directory for workflow examples with configured observability settingsStart Simple: Begin with the Quick Start console exporter example
Explore Supported Telemetry Exporters: Look at existing exporters in the
packages/
directoryChoose Your Pattern: Select Raw, Span, or OpenTelemetry based on your needs
Test Locally: Use file output first, then integrate with your service
Add Advanced Features: Implement batching, retry logic, and error handling as needed