nat.observability.exporter.processing_exporter#

Attributes#

Classes#

ProcessingExporter

A base class for telemetry exporters with processing pipeline support.

Module Contents#

PipelineInputT#
PipelineOutputT#
logger#
class ProcessingExporter(
context_state: nat.builder.context.ContextState | None = None,
drop_nones: bool = True,
)#

Bases: Generic[PipelineInputT, PipelineOutputT], nat.observability.exporter.base_exporter.BaseExporter, nat.observability.mixin.type_introspection_mixin.TypeIntrospectionMixin

A base class for telemetry exporters with processing pipeline support.

This class extends BaseExporter to add processor pipeline functionality. It manages a chain of processors that can transform items before export.

The generic types work as follows: - PipelineInputT: The type of items that enter the processing pipeline (e.g., Span) - PipelineOutputT: The type of items after processing through the pipeline (e.g., converted format)

Key Features: - Processor pipeline management (add, remove, clear) - Type compatibility validation between processors - Pipeline processing with error handling - Configurable None filtering: processors returning None can drop items from pipeline - Automatic type validation before export

Initialize the processing exporter.

Args:

context_state (ContextState | None): The context state to use for the exporter. drop_nones (bool): Whether to drop items when processors return None (default: True).

_signature_method = '_process_pipeline'#
_processors: list[nat.observability.processor.processor.Processor] = []#
_processor_names: dict[str, int]#
_pipeline_locked: bool = False#
_drop_nones: bool = True#
add_processor(
processor: nat.observability.processor.processor.Processor,
name: str | None = None,
position: int | None = None,
before: str | None = None,
after: str | None = None,
) None#

Add a processor to the processing pipeline.

Processors are executed in the order they are added. Processes can transform between any types (T -> U). Supports flexible positioning using names, positions, or relative placement.

Args:

processor (Processor): The processor to add to the pipeline name (str | None): Name for the processor (for later reference). Must be unique. position (int | None): Specific position to insert at (0-based index, -1 for append) before (str | None): Insert before the named processor after (str | None): Insert after the named processor

Raises:

RuntimeError: If pipeline is locked (after startup) ValueError: If positioning arguments conflict or named processor not found

remove_processor(
processor: nat.observability.processor.processor.Processor | str | int,
) None#

Remove a processor from the processing pipeline.

Args:

processor (Processor | str | int): The processor to remove (by name, position, or object).

Raises:

RuntimeError: If pipeline is locked (after startup) ValueError: If named processor or position not found TypeError: If processor argument has invalid type

clear_processors() None#

Clear all processors from the pipeline.

reset_pipeline() None#

Reset the pipeline to allow modifications.

This unlocks the pipeline and clears all processors, allowing the pipeline to be reconfigured. Can only be called when the exporter is stopped.

Raises:

RuntimeError: If exporter is currently running

get_processor_by_name(
name: str,
) nat.observability.processor.processor.Processor | None#

Get a processor by its name.

Args:

name (str): The name of the processor to retrieve

Returns:

Processor | None: The processor with the given name, or None if not found

_check_pipeline_locked() None#

Check if pipeline is locked and raise error if it is.

_calculate_insertion_position(
position: int | None,
before: str | None,
after: str | None,
) int#

Calculate the insertion position based on provided arguments.

Args:

position (int | None): Explicit position (0-based index, -1 for append) before (str | None): Insert before this named processor after (str | None): Insert after this named processor

Returns:

int: The calculated insertion position

Raises:

ValueError: If arguments conflict or named processor not found

_validate_insertion_compatibility(
processor: nat.observability.processor.processor.Processor,
position: int,
) None#

Validate type compatibility for processor insertion.

Args:

processor (Processor): The processor to insert position (int): The position where it will be inserted

Raises:

ValueError: If processor is not compatible with neighbors

_check_processor_compatibility(
source_processor: nat.observability.processor.processor.Processor,
target_processor: nat.observability.processor.processor.Processor,
relationship: str,
source_type: str,
target_type: str,
) None#

Check type compatibility between two processors using Pydantic validation.

Args:

source_processor (Processor): The processor providing output target_processor (Processor): The processor receiving input relationship (str): Description of relationship (“predecessor” or “successor”) source_type (str): String representation of source type target_type (str): String representation of target type

async _pre_start() None#
async _process_pipeline(item: PipelineInputT) PipelineOutputT | None#

Process item through all registered processors.

Args:

item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)

Returns:

PipelineOutputT | None: The processed item after running through all processors

async _process_through_processors(
processors: list[nat.observability.processor.processor.Processor],
item: Any,
) Any#

Process an item through a list of processors.

Args:

processors (list[Processor]): List of processors to run the item through item (Any): The item to process

Returns:
Any: The processed item after running through all processors, or None if

drop_nones is True and any processor returned None

async _export_final_item(
processed_item: Any,
raise_on_invalid: bool = False,
) None#

Export a processed item with proper type handling.

Args:

processed_item (Any): The item to export raise_on_invalid (bool): If True, raise ValueError for invalid types instead of logging warning

async _continue_pipeline_after(
source_processor: nat.observability.processor.processor.Processor,
item: Any,
) None#

Continue processing an item through the pipeline after a specific processor.

This is used when processors (like BatchingProcessor) need to inject items back into the pipeline flow to continue through downstream processors.

Args:

source_processor (Processor): The processor that generated the item item (Any): The item to continue processing through the remaining pipeline

async _export_with_processing(item: PipelineInputT) None#

Export an item after processing it through the pipeline.

Args:

item (PipelineInputT): The item to export

export(
event: nat.data_models.intermediate_step.IntermediateStep,
) None#

Export an IntermediateStep event through the processing pipeline.

This method converts the IntermediateStep to the expected PipelineInputT type, processes it through the pipeline, and exports the result.

Args:

event (IntermediateStep): The event to be exported.

abstractmethod export_processed(
item: PipelineOutputT | list[PipelineOutputT],
) None#
Async:

Export the processed item.

This method must be implemented by concrete exporters to handle the actual export logic after the item has been processed through the pipeline.

Args:

item (PipelineOutputT | list[PipelineOutputT]): The processed item to export (PipelineOutputT type)

_create_export_task(coro: collections.abc.Coroutine) None#

Create task with minimal overhead but proper tracking.

Handles the race condition where stop() may be called between the running check and task creation, or where the event loop may be shutting down. In these cases the coroutine is closed to prevent ‘coroutine was never awaited’ warnings and the error is logged rather than propagated.

Args:

coro: The coroutine to create a task for

async _cleanup() None#

Enhanced cleanup that shuts down all shutdown-aware processors.

Each processor is responsible for its own cleanup, including routing any final batches through the remaining pipeline via their done callbacks.