nat.observability.exporter.processing_exporter#

Attributes#

Classes#

ProcessingExporter

A base class for telemetry exporters with processing pipeline support.

Module Contents#

PipelineInputT#
PipelineOutputT#
logger#
class ProcessingExporter(
context_state: nat.builder.context.ContextState | None = None,
drop_nones: bool = True,
)#

Bases: Generic[PipelineInputT, PipelineOutputT], nat.observability.exporter.base_exporter.BaseExporter, nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixin

A base class for telemetry exporters with processing pipeline support.

This class extends BaseExporter to add processor pipeline functionality. It manages a chain of processors that can transform items before export.

The generic types work as follows: - PipelineInputT: The type of items that enter the processing pipeline (e.g., Span) - PipelineOutputT: The type of items after processing through the pipeline (e.g., converted format)

Key Features: - Processor pipeline management (add, remove, clear) - Type compatibility validation between processors - Pipeline processing with error handling - Configurable None filtering: processors returning None can drop items from pipeline - Automatic type validation before export

Initialize the processing exporter.

Args:

context_state (ContextState | None): The context state to use for the exporter. drop_nones (bool): Whether to drop items when processors return None (default: True).

_signature_method = '_process_pipeline'#
_processors: list[nat.observability.processor.processor.Processor] = []#
_processor_names: dict[str, int]#
_pipeline_locked: bool = False#
_drop_nones: bool = True#
add_processor(
processor: nat.observability.processor.processor.Processor,
name: str | None = None,
position: int | None = None,
before: str | None = None,
after: str | None = None,
) None#

Add a processor to the processing pipeline.

Processors are executed in the order they are added. Processes can transform between any types (T -> U). Supports flexible positioning using names, positions, or relative placement.

Args:

processor (Processor): The processor to add to the pipeline name (str | None): Name for the processor (for later reference). Must be unique. position (int | None): Specific position to insert at (0-based index, -1 for append) before (str | None): Insert before the named processor after (str | None): Insert after the named processor

Raises:

RuntimeError: If pipeline is locked (after startup) ValueError: If positioning arguments conflict or named processor not found

remove_processor(
processor: nat.observability.processor.processor.Processor | str | int,
) None#

Remove a processor from the processing pipeline.

Args:

processor (Processor | str | int): The processor to remove (by name, position, or object).

Raises:

RuntimeError: If pipeline is locked (after startup) ValueError: If named processor or position not found TypeError: If processor argument has invalid type

clear_processors() None#

Clear all processors from the pipeline.

reset_pipeline() None#

Reset the pipeline to allow modifications.

This unlocks the pipeline and clears all processors, allowing the pipeline to be reconfigured. Can only be called when the exporter is stopped.

Raises:

RuntimeError: If exporter is currently running

get_processor_by_name(
name: str,
) nat.observability.processor.processor.Processor | None#

Get a processor by its name.

Args:

name (str): The name of the processor to retrieve

Returns:

Processor | None: The processor with the given name, or None if not found

_check_pipeline_locked() None#

Check if pipeline is locked and raise error if it is.

_calculate_insertion_position(
position: int | None,
before: str | None,
after: str | None,
) int#

Calculate the insertion position based on provided arguments.

Args:

position (int | None): Explicit position (0-based index, -1 for append) before (str | None): Insert before this named processor after (str | None): Insert after this named processor

Returns:

int: The calculated insertion position

Raises:

ValueError: If arguments conflict or named processor not found

_validate_insertion_compatibility(
processor: nat.observability.processor.processor.Processor,
position: int,
) None#

Validate type compatibility for processor insertion.

Args:

processor (Processor): The processor to insert position (int): The position where it will be inserted

Raises:

ValueError: If processor is not compatible with neighbors

_check_processor_compatibility(
source_processor: nat.observability.processor.processor.Processor,
target_processor: nat.observability.processor.processor.Processor,
relationship: str,
source_type: str,
target_type: str,
) None#

Check type compatibility between two processors using Pydantic validation.

Args:

source_processor (Processor): The processor providing output target_processor (Processor): The processor receiving input relationship (str): Description of relationship (“predecessor” or “successor”) source_type (str): String representation of source type target_type (str): String representation of target type

async _pre_start() None#

Called before the exporter starts.

async _process_pipeline(item: PipelineInputT) PipelineOutputT | None#

Process item through all registered processors.

Args:

item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)

Returns:

PipelineOutputT | None: The processed item after running through all processors

async _process_through_processors(
processors: list[nat.observability.processor.processor.Processor],
item: Any,
) Any#

Process an item through a list of processors.

Args:

processors (list[Processor]): List of processors to run the item through item (Any): The item to process

Returns:
Any: The processed item after running through all processors, or None if

drop_nones is True and any processor returned None

async _export_final_item(
processed_item: Any,
raise_on_invalid: bool = False,
) None#

Export a processed item with proper type handling.

Args:

processed_item (Any): The item to export raise_on_invalid (bool): If True, raise ValueError for invalid types instead of logging warning

async _continue_pipeline_after(
source_processor: nat.observability.processor.processor.Processor,
item: Any,
) None#

Continue processing an item through the pipeline after a specific processor.

This is used when processors (like BatchingProcessor) need to inject items back into the pipeline flow to continue through downstream processors.

Args:

source_processor (Processor): The processor that generated the item item (Any): The item to continue processing through the remaining pipeline

async _export_with_processing(item: PipelineInputT) None#

Export an item after processing it through the pipeline.

Args:

item (PipelineInputT): The item to export

export(
event: nat.data_models.intermediate_step.IntermediateStep,
) None#

Export an IntermediateStep event through the processing pipeline.

This method converts the IntermediateStep to the expected PipelineInputT type, processes it through the pipeline, and exports the result.

Args:

event (IntermediateStep): The event to be exported.

abstractmethod export_processed(
item: PipelineOutputT | list[PipelineOutputT],
) None#
Async:

Export the processed item.

This method must be implemented by concrete exporters to handle the actual export logic after the item has been processed through the pipeline.

Args:

item (PipelineOutputT | list[PipelineOutputT]): The processed item to export (PipelineOutputT type)

_create_export_task(coro: collections.abc.Coroutine) None#

Create task with minimal overhead but proper tracking.

Args:

coro: The coroutine to create a task for

async _cleanup() None#

Enhanced cleanup that shuts down all shutdown-aware processors.

Each processor is responsible for its own cleanup, including routing any final batches through the remaining pipeline via their done callbacks.