nat.observability.exporter.processing_exporter#
Attributes#
Classes#
A base class for telemetry exporters with processing pipeline support. |
Module Contents#
- PipelineInputT#
- PipelineOutputT#
- logger#
- class ProcessingExporter(
- context_state: nat.builder.context.ContextState | None = None,
Bases:
Generic
[PipelineInputT
,PipelineOutputT
],nat.observability.exporter.base_exporter.BaseExporter
,nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixin
A base class for telemetry exporters with processing pipeline support.
This class extends BaseExporter to add processor pipeline functionality. It manages a chain of processors that can transform items before export.
The generic types work as follows: - PipelineInputT: The type of items that enter the processing pipeline (e.g., Span) - PipelineOutputT: The type of items after processing through the pipeline (e.g., converted format)
Key Features: - Processor pipeline management (add, remove, clear) - Type compatibility validation between processors - Pipeline processing with error handling - Automatic type validation before export
Initialize the processing exporter.
- Args:
context_state: The context state to use for the exporter.
- _processors: list[nat.observability.processor.processor.Processor] = []#
- add_processor(
- processor: nat.observability.processor.processor.Processor,
Add a processor to the processing pipeline.
Processors are executed in the order they are added. Processors can transform between any types (T -> U).
- Args:
processor: The processor to add to the pipeline
- remove_processor(
- processor: nat.observability.processor.processor.Processor,
Remove a processor from the processing pipeline.
- Args:
processor: The processor to remove from the pipeline
- async _process_pipeline(item: PipelineInputT) PipelineOutputT #
Process item through all registered processors.
- Args:
item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)
- Returns:
PipelineOutputT: The processed item after running through all processors
- async _process_through_processors(
- processors: list[nat.observability.processor.processor.Processor],
- item: Any,
Process an item through a list of processors.
- Args:
processors (list[Processor]): List of processors to run the item through item (Any): The item to process
- Returns:
The processed item after running through all processors
- async _export_final_item(
- processed_item: Any,
- raise_on_invalid: bool = False,
Export a processed item with proper type handling.
- Args:
processed_item (Any): The item to export raise_on_invalid (bool): If True, raise ValueError for invalid types instead of logging warning
- async _continue_pipeline_after(
- source_processor: nat.observability.processor.processor.Processor,
- item: Any,
Continue processing an item through the pipeline after a specific processor.
This is used when processors (like BatchingProcessor) need to inject items back into the pipeline flow to continue through downstream processors.
- Args:
source_processor (Processor): The processor that generated the item item (Any): The item to continue processing through the remaining pipeline
- async _export_with_processing(item: PipelineInputT) None #
Export an item after processing it through the pipeline.
- Args:
item: The item to export
- export( ) None #
Export an IntermediateStep event through the processing pipeline.
This method converts the IntermediateStep to the expected PipelineInputT type, processes it through the pipeline, and exports the result.
- Args:
event (IntermediateStep): The event to be exported.
- abstractmethod export_processed(
- item: PipelineOutputT | list[PipelineOutputT],
- Async:
Export the processed item.
This method must be implemented by concrete exporters to handle the actual export logic after the item has been processed through the pipeline.
- Args:
item: The processed item to export (PipelineOutputT type)
- _create_export_task(coro: collections.abc.Coroutine)#
Create task with minimal overhead but proper tracking.
- async _cleanup()#
Enhanced cleanup that shuts down all shutdown-aware processors.
Each processor is responsible for its own cleanup, including routing any final batches through the remaining pipeline via their done callbacks.