*** title: XennaExecutor description: >- API reference for XennaExecutor - the production executor using Cosmos-Xenna for distributed execution ------------------------- `XennaExecutor` is the production executor that uses Cosmos-Xenna for distributed execution. It's the default executor used when running pipelines. ## Import ```python from nemo_curator.backends.xenna import XennaExecutor ``` ## Class Definition ```python class XennaExecutor(BaseExecutor): """Production executor using Cosmos-Xenna for distributed execution. Provides: - Distributed task orchestration - Resource allocation and management - Batch processing optimization - Performance metrics collection """ def __init__( self, config: dict[str, Any] | None = None, ignore_head_node: bool = False, ) -> None: """Initialize the executor. Args: config: Executor configuration dictionary. ignore_head_node: Not supported. Raises ValueError if True. """ ``` ## Configuration Options | Option | Type | Default | Description | | --------------------------- | ------------- | ------------- | ----------------------------- | | `logging_interval` | `int` | `60` | Seconds between progress logs | | `ignore_failures` | `bool` | `False` | Continue on task failures | | `max_workers_per_stage` | `int \| None` | `None` | Max workers per stage | | `execution_mode` | `str` | `"streaming"` | `"streaming"` or `"batch"` | | `cpu_allocation_percentage` | `float` | `0.95` | CPU allocation fraction | | `autoscale_interval_s` | `int` | `180` | Autoscaling check interval | ## Usage Examples ### Default Configuration ```python from nemo_curator.pipeline import Pipeline from nemo_curator.backends.xenna import XennaExecutor pipeline = Pipeline(name="my_pipeline", stages=[...]) # Default executor results = pipeline.run() # Equivalent to: executor = XennaExecutor() results = pipeline.run(executor=executor) ``` ### Custom Configuration ```python executor = XennaExecutor(config={ "logging_interval": 30, "ignore_failures": True, "execution_mode": "batch", "cpu_allocation_percentage": 0.9, }) results = pipeline.run(executor=executor) ``` ### Streaming vs Batch Mode Processes tasks as they become available: ```python executor = XennaExecutor(config={ "execution_mode": "streaming", }) ``` **Best for:** * Large datasets * Memory-constrained environments * Real-time processing Waits for all tasks before processing: ```python executor = XennaExecutor(config={ "execution_mode": "batch", }) ``` **Best for:** * Small to medium datasets * Operations requiring global ordering * When task dependencies span batches ## Methods ### `execute()` Execute the pipeline stages. ```python def execute( self, stages: list[ProcessingStage], initial_tasks: list[Task] | None = None, ) -> list[Task]: """Execute the pipeline stages. Args: stages: List of processing stages to execute. initial_tasks: Initial tasks (defaults to EmptyTask). Returns: List of output tasks from the final stage. """ ``` ## Error Handling ```python executor = XennaExecutor(config={ "ignore_failures": True, # Continue despite errors }) try: results = pipeline.run(executor=executor) except Exception as e: # Handle pipeline-level failures print(f"Pipeline failed: {e}") ``` ## Performance Monitoring The executor automatically collects performance metrics: ```python results = pipeline.run(executor=executor) # Each task contains performance data for task in results: for perf in task._stage_perf: print(f"Stage: {perf.stage_name}") print(f" Duration: {perf.process_time}s") print(f" Items processed: {perf.num_items_processed}") ``` ## Source Code [View source on GitHub](https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/backends/xenna/executor.py)