nat.observability.exporter.processing_exporter#
Attributes#
Classes#
A base class for telemetry exporters with processing pipeline support. |
Module Contents#
- PipelineInputT#
- PipelineOutputT#
- logger#
- class ProcessingExporter(
- context_state: nat.builder.context.ContextState | None = None,
- drop_nones: bool = True,
Bases:
Generic[PipelineInputT,PipelineOutputT],nat.observability.exporter.base_exporter.BaseExporter,nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixinA base class for telemetry exporters with processing pipeline support.
This class extends BaseExporter to add processor pipeline functionality. It manages a chain of processors that can transform items before export.
The generic types work as follows: - PipelineInputT: The type of items that enter the processing pipeline (e.g., Span) - PipelineOutputT: The type of items after processing through the pipeline (e.g., converted format)
Key Features: - Processor pipeline management (add, remove, clear) - Type compatibility validation between processors - Pipeline processing with error handling - Configurable None filtering: processors returning None can drop items from pipeline - Automatic type validation before export
Initialize the processing exporter.
- Args:
context_state (ContextState | None): The context state to use for the exporter. drop_nones (bool): Whether to drop items when processors return None (default: True).
- _signature_method = '_process_pipeline'#
- _processors: list[nat.observability.processor.processor.Processor] = []#
- add_processor(
- processor: nat.observability.processor.processor.Processor,
- name: str | None = None,
- position: int | None = None,
- before: str | None = None,
- after: str | None = None,
Add a processor to the processing pipeline.
Processors are executed in the order they are added. Processes can transform between any types (T -> U). Supports flexible positioning using names, positions, or relative placement.
- Args:
processor (Processor): The processor to add to the pipeline name (str | None): Name for the processor (for later reference). Must be unique. position (int | None): Specific position to insert at (0-based index, -1 for append) before (str | None): Insert before the named processor after (str | None): Insert after the named processor
- Raises:
RuntimeError: If pipeline is locked (after startup) ValueError: If positioning arguments conflict or named processor not found
- remove_processor(
- processor: nat.observability.processor.processor.Processor | str | int,
Remove a processor from the processing pipeline.
- Args:
processor (Processor | str | int): The processor to remove (by name, position, or object).
- Raises:
RuntimeError: If pipeline is locked (after startup) ValueError: If named processor or position not found TypeError: If processor argument has invalid type
- reset_pipeline() None#
Reset the pipeline to allow modifications.
This unlocks the pipeline and clears all processors, allowing the pipeline to be reconfigured. Can only be called when the exporter is stopped.
- Raises:
RuntimeError: If exporter is currently running
- get_processor_by_name(
- name: str,
Get a processor by its name.
- Args:
name (str): The name of the processor to retrieve
- Returns:
Processor | None: The processor with the given name, or None if not found
- _calculate_insertion_position( ) int#
Calculate the insertion position based on provided arguments.
- Args:
position (int | None): Explicit position (0-based index, -1 for append) before (str | None): Insert before this named processor after (str | None): Insert after this named processor
- Returns:
int: The calculated insertion position
- Raises:
ValueError: If arguments conflict or named processor not found
- _validate_insertion_compatibility(
- processor: nat.observability.processor.processor.Processor,
- position: int,
Validate type compatibility for processor insertion.
- Args:
processor (Processor): The processor to insert position (int): The position where it will be inserted
- Raises:
ValueError: If processor is not compatible with neighbors
- _check_processor_compatibility(
- source_processor: nat.observability.processor.processor.Processor,
- target_processor: nat.observability.processor.processor.Processor,
- relationship: str,
- source_type: str,
- target_type: str,
Check type compatibility between two processors using Pydantic validation.
- Args:
source_processor (Processor): The processor providing output target_processor (Processor): The processor receiving input relationship (str): Description of relationship (“predecessor” or “successor”) source_type (str): String representation of source type target_type (str): String representation of target type
- async _process_pipeline(item: PipelineInputT) PipelineOutputT | None#
Process item through all registered processors.
- Args:
item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)
- Returns:
PipelineOutputT | None: The processed item after running through all processors
- async _process_through_processors(
- processors: list[nat.observability.processor.processor.Processor],
- item: Any,
Process an item through a list of processors.
- Args:
processors (list[Processor]): List of processors to run the item through item (Any): The item to process
- Returns:
- Any: The processed item after running through all processors, or None if
drop_nones is True and any processor returned None
- async _export_final_item(
- processed_item: Any,
- raise_on_invalid: bool = False,
Export a processed item with proper type handling.
- Args:
processed_item (Any): The item to export raise_on_invalid (bool): If True, raise ValueError for invalid types instead of logging warning
- async _continue_pipeline_after(
- source_processor: nat.observability.processor.processor.Processor,
- item: Any,
Continue processing an item through the pipeline after a specific processor.
This is used when processors (like BatchingProcessor) need to inject items back into the pipeline flow to continue through downstream processors.
- Args:
source_processor (Processor): The processor that generated the item item (Any): The item to continue processing through the remaining pipeline
- async _export_with_processing(item: PipelineInputT) None#
Export an item after processing it through the pipeline.
- Args:
item (PipelineInputT): The item to export
- export( ) None#
Export an IntermediateStep event through the processing pipeline.
This method converts the IntermediateStep to the expected PipelineInputT type, processes it through the pipeline, and exports the result.
- Args:
event (IntermediateStep): The event to be exported.
- abstractmethod export_processed(
- item: PipelineOutputT | list[PipelineOutputT],
- Async:
Export the processed item.
This method must be implemented by concrete exporters to handle the actual export logic after the item has been processed through the pipeline.
- Args:
item (PipelineOutputT | list[PipelineOutputT]): The processed item to export (PipelineOutputT type)
- _create_export_task(coro: collections.abc.Coroutine) None#
Create task with minimal overhead but proper tracking.
- Args:
coro: The coroutine to create a task for