nat.observability.processor.batching_processor#

Attributes#

Classes#

BatchingProcessor

Pass-through batching processor that accumulates items and outputs batched lists.

Module Contents#

logger#
T#
class BatchingProcessor(
batch_size: int = 100,
flush_interval: float = 5.0,
max_queue_size: int = 1000,
drop_on_overflow: bool = False,
shutdown_timeout: float = 10.0,
)#

Bases: nat.observability.processor.callback_processor.CallbackProcessor[T, list[T]], Generic[T]

Pass-through batching processor that accumulates items and outputs batched lists.

This processor extends CallbackProcessor[T, List[T]] to provide batching functionality. It accumulates individual items and outputs them as batches when size or time thresholds are met. The batched output continues through the processing pipeline.

CRITICAL: Implements proper cleanup to ensure NO ITEMS ARE LOST during shutdown. The ProcessingExporter._cleanup() method calls shutdown() on all processors.

Key Features: - Pass-through design: Processor[T, List[T]] - Size-based and time-based batching - Pipeline flow: batches continue through downstream processors - GUARANTEED: No items lost during cleanup - Comprehensive statistics and monitoring - Proper cleanup and shutdown handling - High-performance async implementation - Back-pressure handling with queue limits

Pipeline Flow:

Normal processing: Individual items → BatchingProcessor → List[items] → downstream processors → export Time-based flush: Scheduled batches automatically continue through remaining pipeline Shutdown: Final batch immediately routed through remaining pipeline

Cleanup Guarantee:

When shutdown() is called, this processor: 1. Stops accepting new items 2. Creates final batch from all queued items 3. Immediately routes final batch through remaining pipeline via callback 4. Ensures zero data loss with no external coordination needed

Usage in Pipeline:

`python # Individual spans Batched spans Continue through downstream processors exporter.add_processor(BatchingProcessor[Span](batch_size=100))  # Auto-wired with pipeline callback exporter.add_processor(FilterProcessor())  # Processes List[Span] from batching exporter.add_processor(TransformProcessor())  # Further processing `

Args:

batch_size: Maximum items per batch (default: 100) flush_interval: Max seconds to wait before flushing (default: 5.0) max_queue_size: Maximum items to queue before blocking (default: 1000) drop_on_overflow: If True, drop items when queue is full (default: False) shutdown_timeout: Max seconds to wait for final batch processing (default: 10.0)

Note:

The done_callback for pipeline integration is automatically set by ProcessingExporter when the processor is added to a pipeline. For standalone usage, call set_done_callback().

_batch_size = 100#
_flush_interval = 5.0#
_max_queue_size = 1000#
_drop_on_overflow = False#
_shutdown_timeout = 10.0#
_done_callback: collections.abc.Callable[[list[T]], collections.abc.Awaitable[None]] | None = None#
_batch_queue: collections.deque[T]#
_last_flush_time#
_flush_task: asyncio.Task | None = None#
_batch_lock#
_shutdown_requested = False#
_shutdown_complete = False#
_shutdown_complete_event#
_done = None#
_batches_created = 0#
_items_processed = 0#
_items_dropped = 0#
_queue_overflows = 0#
_shutdown_batches = 0#
async process(item: T) list[T]#

Process an item by adding it to the batch queue.

Returns a batch when batching conditions are met, otherwise returns empty list. This maintains the Processor[T, List[T]] contract while handling batching logic.

During shutdown, immediately returns items as single-item batches to ensure no data loss.

Args:

item: The item to add to the current batch

Returns:

List[T]: A batch of items when ready, empty list otherwise

set_done_callback(
callback: collections.abc.Callable[[list[T]], collections.abc.Awaitable[None]],
)#

Set callback function for routing batches through the remaining pipeline.

This is automatically set by ProcessingExporter.add_processor() to continue batches through downstream processors before final export.

async _schedule_flush()#

Schedule a flush after the flush interval.

async _create_batch() list[T]#

Create a batch from the current queue.

async force_flush() list[T]#

Force an immediate flush of all queued items.

Returns:

List[T]: The current batch, empty list if no items queued

async shutdown() None#

Shutdown the processor and ensure all items are processed.

CRITICAL: This method is called by ProcessingExporter._cleanup() to ensure no items are lost during shutdown. It immediately routes any remaining items as a final batch through the rest of the processing pipeline.

get_stats() dict[str, Any]#

Get comprehensive batching statistics.