nat.observability.exporter.processing_exporter#

Attributes#

Classes#

ProcessingExporter

A base class for telemetry exporters with processing pipeline support.

Module Contents#

PipelineInputT#
PipelineOutputT#
logger#
class ProcessingExporter(
context_state: nat.builder.context.ContextState | None = None,
)#

Bases: Generic[PipelineInputT, PipelineOutputT], nat.observability.exporter.base_exporter.BaseExporter, nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixin

A base class for telemetry exporters with processing pipeline support.

This class extends BaseExporter to add processor pipeline functionality. It manages a chain of processors that can transform items before export.

The generic types work as follows: - PipelineInputT: The type of items that enter the processing pipeline (e.g., Span) - PipelineOutputT: The type of items after processing through the pipeline (e.g., converted format)

Key Features: - Processor pipeline management (add, remove, clear) - Type compatibility validation between processors - Pipeline processing with error handling - Automatic type validation before export

Initialize the processing exporter.

Args:

context_state: The context state to use for the exporter.

_processors: list[nat.observability.processor.processor.Processor] = []#
add_processor(
processor: nat.observability.processor.processor.Processor,
) None#

Add a processor to the processing pipeline.

Processors are executed in the order they are added. Processors can transform between any types (T -> U).

Args:

processor: The processor to add to the pipeline

remove_processor(
processor: nat.observability.processor.processor.Processor,
) None#

Remove a processor from the processing pipeline.

Args:

processor: The processor to remove from the pipeline

clear_processors() None#

Clear all processors from the pipeline.

async _pre_start() None#

Called before the exporter starts.

async _process_pipeline(item: PipelineInputT) PipelineOutputT#

Process item through all registered processors.

Args:

item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)

Returns:

PipelineOutputT: The processed item after running through all processors

async _process_through_processors(
processors: list[nat.observability.processor.processor.Processor],
item: Any,
) Any#

Process an item through a list of processors.

Args:

processors (list[Processor]): List of processors to run the item through item (Any): The item to process

Returns:

The processed item after running through all processors

async _export_final_item(
processed_item: Any,
raise_on_invalid: bool = False,
) None#

Export a processed item with proper type handling.

Args:

processed_item (Any): The item to export raise_on_invalid (bool): If True, raise ValueError for invalid types instead of logging warning

async _continue_pipeline_after(
source_processor: nat.observability.processor.processor.Processor,
item: Any,
) None#

Continue processing an item through the pipeline after a specific processor.

This is used when processors (like BatchingProcessor) need to inject items back into the pipeline flow to continue through downstream processors.

Args:

source_processor (Processor): The processor that generated the item item (Any): The item to continue processing through the remaining pipeline

async _export_with_processing(item: PipelineInputT) None#

Export an item after processing it through the pipeline.

Args:

item: The item to export

export(
event: nat.data_models.intermediate_step.IntermediateStep,
) None#

Export an IntermediateStep event through the processing pipeline.

This method converts the IntermediateStep to the expected PipelineInputT type, processes it through the pipeline, and exports the result.

Args:

event (IntermediateStep): The event to be exported.

abstractmethod export_processed(
item: PipelineOutputT | list[PipelineOutputT],
) None#
Async:

Export the processed item.

This method must be implemented by concrete exporters to handle the actual export logic after the item has been processed through the pipeline.

Args:

item: The processed item to export (PipelineOutputT type)

_create_export_task(coro: collections.abc.Coroutine)#

Create task with minimal overhead but proper tracking.

async _cleanup()#

Enhanced cleanup that shuts down all shutdown-aware processors.

Each processor is responsible for its own cleanup, including routing any final batches through the remaining pipeline via their done callbacks.