nat.observability.exporter.processing_exporter#
Attributes#
Classes#
A base class for telemetry exporters with processing pipeline support. |
Module Contents#
- PipelineInputT#
- PipelineOutputT#
- logger#
- class ProcessingExporter(
- context_state: nat.builder.context.ContextState | None = None,
- drop_nones: bool = True,
Bases:
Generic[PipelineInputT,PipelineOutputT],nat.observability.exporter.base_exporter.BaseExporter,nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixinA base class for telemetry exporters with processing pipeline support.
This class extends BaseExporter to add processor pipeline functionality. It manages a chain of processors that can transform items before export.
The generic types work as follows: - PipelineInputT: The type of items that enter the processing pipeline (e.g., Span) - PipelineOutputT: The type of items after processing through the pipeline (e.g., converted format)
Key Features: - Processor pipeline management (add, remove, clear) - Type compatibility validation between processors - Pipeline processing with error handling - Configurable None filtering: processors returning None can drop items from pipeline - Automatic type validation before export
Initialize the processing exporter.
- Args:
context_state (ContextState | None): The context state to use for the exporter. drop_nones (bool): Whether to drop items when processors return None (default: True).
- _signature_method = '_process_pipeline'#
- _processors: list[nat.observability.processor.processor.Processor] = []#
- add_processor(
- processor: nat.observability.processor.processor.Processor,
- name: str | None = None,
- position: int | None = None,
- before: str | None = None,
- after: str | None = None,
Add a processor to the processing pipeline.
Processors are executed in the order they are added. Processes can transform between any types (T -> U). Supports flexible positioning using names, positions, or relative placement.
- Args:
processor (Processor): The processor to add to the pipeline name (str | None): Name for the processor (for later reference). Must be unique. position (int | None): Specific position to insert at (0-based index, -1 for append) before (str | None): Insert before the named processor after (str | None): Insert after the named processor
- Raises:
RuntimeError: If pipeline is locked (after startup) ValueError: If positioning arguments conflict or named processor not found
- remove_processor(
- processor: nat.observability.processor.processor.Processor | str | int,
Remove a processor from the processing pipeline.
- Args:
processor (Processor | str | int): The processor to remove (by name, position, or object).
- Raises:
RuntimeError: If pipeline is locked (after startup) ValueError: If named processor or position not found TypeError: If processor argument has invalid type
- reset_pipeline() None#
Reset the pipeline to allow modifications.
This unlocks the pipeline and clears all processors, allowing the pipeline to be reconfigured. Can only be called when the exporter is stopped.
- Raises:
RuntimeError: If exporter is currently running
- get_processor_by_name(
- name: str,
Get a processor by its name.
- Args:
name (str): The name of the processor to retrieve
- Returns:
Processor | None: The processor with the given name, or None if not found
- _calculate_insertion_position( ) int#
Calculate the insertion position based on provided arguments.
- Args:
position (int | None): Explicit position (0-based index, -1 for append) before (str | None): Insert before this named processor after (str | None): Insert after this named processor
- Returns:
int: The calculated insertion position
- Raises:
ValueError: If arguments conflict or named processor not found
- _validate_insertion_compatibility(
- processor: nat.observability.processor.processor.Processor,
- position: int,
Validate type compatibility for processor insertion.
- Args:
processor (Processor): The processor to insert position (int): The position where it will be inserted
- Raises:
ValueError: If processor is not compatible with neighbors
- _check_processor_compatibility(
- source_processor: nat.observability.processor.processor.Processor,
- target_processor: nat.observability.processor.processor.Processor,
- relationship: str,
- source_type: str,
- target_type: str,
Check type compatibility between two processors using Pydantic validation.
- Args:
source_processor (Processor): The processor providing output target_processor (Processor): The processor receiving input relationship (str): Description of relationship (“predecessor” or “successor”) source_type (str): String representation of source type target_type (str): String representation of target type
- async _process_pipeline(item: PipelineInputT) PipelineOutputT | None#
Process item through all registered processors.
- Args:
item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)
- Returns:
PipelineOutputT | None: The processed item after running through all processors
- async _process_through_processors(
- processors: list[nat.observability.processor.processor.Processor],
- item: Any,
Process an item through a list of processors.
- Args:
processors (list[Processor]): List of processors to run the item through item (Any): The item to process
- Returns:
- Any: The processed item after running through all processors, or None if
drop_nones is True and any processor returned None
- async _export_final_item(
- processed_item: Any,
- raise_on_invalid: bool = False,
Export a processed item with proper type handling.
- Args:
processed_item (Any): The item to export raise_on_invalid (bool): If True, raise ValueError for invalid types instead of logging warning
- async _continue_pipeline_after(
- source_processor: nat.observability.processor.processor.Processor,
- item: Any,
Continue processing an item through the pipeline after a specific processor.
This is used when processors (like BatchingProcessor) need to inject items back into the pipeline flow to continue through downstream processors.
- Args:
source_processor (Processor): The processor that generated the item item (Any): The item to continue processing through the remaining pipeline
- async _export_with_processing(item: PipelineInputT) None#
Export an item after processing it through the pipeline.
- Args:
item (PipelineInputT): The item to export
- export( ) None#
Export an IntermediateStep event through the processing pipeline.
This method converts the IntermediateStep to the expected PipelineInputT type, processes it through the pipeline, and exports the result.
- Args:
event (IntermediateStep): The event to be exported.
- abstractmethod export_processed(
- item: PipelineOutputT | list[PipelineOutputT],
- Async:
Export the processed item.
This method must be implemented by concrete exporters to handle the actual export logic after the item has been processed through the pipeline.
- Args:
item (PipelineOutputT | list[PipelineOutputT]): The processed item to export (PipelineOutputT type)
- _create_export_task(coro: collections.abc.Coroutine) None#
Create task with minimal overhead but proper tracking.
Handles the race condition where stop() may be called between the running check and task creation, or where the event loop may be shutting down. In these cases the coroutine is closed to prevent ‘coroutine was never awaited’ warnings and the error is logged rather than propagated.
- Args:
coro: The coroutine to create a task for