nat.observability.processor.batching_processor#
Attributes#
Classes#
Pass-through batching processor that accumulates items and outputs batched lists. |
Module Contents#
- logger#
- T#
- class BatchingProcessor(
- batch_size: int = 100,
- flush_interval: float = 5.0,
- max_queue_size: int = 1000,
- drop_on_overflow: bool = False,
- shutdown_timeout: float = 10.0,
Bases:
nat.observability.processor.callback_processor.CallbackProcessor
[T
,list
[T
]],Generic
[T
]Pass-through batching processor that accumulates items and outputs batched lists.
This processor extends CallbackProcessor[T, List[T]] to provide batching functionality. It accumulates individual items and outputs them as batches when size or time thresholds are met. The batched output continues through the processing pipeline.
CRITICAL: Implements proper cleanup to ensure NO ITEMS ARE LOST during shutdown. The ProcessingExporter._cleanup() method calls shutdown() on all processors.
Key Features: - Pass-through design: Processor[T, List[T]] - Size-based and time-based batching - Pipeline flow: batches continue through downstream processors - GUARANTEED: No items lost during cleanup - Comprehensive statistics and monitoring - Proper cleanup and shutdown handling - High-performance async implementation - Back-pressure handling with queue limits
- Pipeline Flow:
Normal processing: Individual items → BatchingProcessor → List[items] → downstream processors → export Time-based flush: Scheduled batches automatically continue through remaining pipeline Shutdown: Final batch immediately routed through remaining pipeline
- Cleanup Guarantee:
When shutdown() is called, this processor: 1. Stops accepting new items 2. Creates final batch from all queued items 3. Immediately routes final batch through remaining pipeline via callback 4. Ensures zero data loss with no external coordination needed
- Usage in Pipeline:
`python # Individual spans → Batched spans → Continue through downstream processors exporter.add_processor(BatchingProcessor[Span](batch_size=100)) # Auto-wired with pipeline callback exporter.add_processor(FilterProcessor()) # Processes List[Span] from batching exporter.add_processor(TransformProcessor()) # Further processing `
- Args:
batch_size: Maximum items per batch (default: 100) flush_interval: Max seconds to wait before flushing (default: 5.0) max_queue_size: Maximum items to queue before blocking (default: 1000) drop_on_overflow: If True, drop items when queue is full (default: False) shutdown_timeout: Max seconds to wait for final batch processing (default: 10.0)
- Note:
The done_callback for pipeline integration is automatically set by ProcessingExporter when the processor is added to a pipeline. For standalone usage, call set_done_callback().
- _batch_size = 100#
- _flush_interval = 5.0#
- _max_queue_size = 1000#
- _drop_on_overflow = False#
- _shutdown_timeout = 10.0#
- _done_callback: collections.abc.Callable[[list[T]], collections.abc.Awaitable[None]] | None = None#
- _batch_queue: collections.deque[T]#
- _last_flush_time#
- _flush_task: asyncio.Task | None = None#
- _batch_lock#
- _shutdown_requested = False#
- _shutdown_complete = False#
- _shutdown_complete_event#
- _done = None#
- _batches_created = 0#
- _items_processed = 0#
- _items_dropped = 0#
- _queue_overflows = 0#
- _shutdown_batches = 0#
- async process(item: T) list[T] #
Process an item by adding it to the batch queue.
Returns a batch when batching conditions are met, otherwise returns empty list. This maintains the Processor[T, List[T]] contract while handling batching logic.
During shutdown, immediately returns items as single-item batches to ensure no data loss.
- Args:
item: The item to add to the current batch
- Returns:
List[T]: A batch of items when ready, empty list otherwise
- set_done_callback(
- callback: collections.abc.Callable[[list[T]], collections.abc.Awaitable[None]],
Set callback function for routing batches through the remaining pipeline.
This is automatically set by ProcessingExporter.add_processor() to continue batches through downstream processors before final export.
- async _schedule_flush()#
Schedule a flush after the flush interval.
- async force_flush() list[T] #
Force an immediate flush of all queued items.
- Returns:
List[T]: The current batch, empty list if no items queued