***

title: XennaExecutor
description: >-
API reference for XennaExecutor - the production executor using Cosmos-Xenna
for distributed execution
-------------------------

`XennaExecutor` is the production executor that uses Cosmos-Xenna for distributed execution. It's the default executor used when running pipelines.

## Import

```python
from nemo_curator.backends.xenna import XennaExecutor
```

## Class Definition

```python
class XennaExecutor(BaseExecutor):
    """Production executor using Cosmos-Xenna for distributed execution.

    Provides:
    - Distributed task orchestration
    - Resource allocation and management
    - Batch processing optimization
    - Performance metrics collection
    """

    def __init__(
        self,
        config: dict[str, Any] | None = None,
        ignore_head_node: bool = False,
    ) -> None:
        """Initialize the executor.

        Args:
            config: Executor configuration dictionary.
            ignore_head_node: Not supported. Raises ValueError if True.
        """
```

## Configuration Options

| Option                      | Type          | Default       | Description                   |
| --------------------------- | ------------- | ------------- | ----------------------------- |
| `logging_interval`          | `int`         | `60`          | Seconds between progress logs |
| `ignore_failures`           | `bool`        | `False`       | Continue on task failures     |
| `max_workers_per_stage`     | `int \| None` | `None`        | Max workers per stage         |
| `execution_mode`            | `str`         | `"streaming"` | `"streaming"` or `"batch"`    |
| `cpu_allocation_percentage` | `float`       | `0.95`        | CPU allocation fraction       |
| `autoscale_interval_s`      | `int`         | `180`         | Autoscaling check interval    |

## Usage Examples

### Default Configuration

```python
from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor

pipeline = Pipeline(name="my_pipeline", stages=[...])

# Default executor
results = pipeline.run()

# Equivalent to:
executor = XennaExecutor()
results = pipeline.run(executor=executor)
```

### Custom Configuration

```python
executor = XennaExecutor(config={
    "logging_interval": 30,
    "ignore_failures": True,
    "execution_mode": "batch",
    "cpu_allocation_percentage": 0.9,
})

results = pipeline.run(executor=executor)
```

### Streaming vs Batch Mode

<Tabs>
  <Tab title="Streaming Mode">
    Processes tasks as they become available:

    ```python
    executor = XennaExecutor(config={
        "execution_mode": "streaming",
    })
    ```

    **Best for:**

    * Large datasets
    * Memory-constrained environments
    * Real-time processing
  </Tab>

  <Tab title="Batch Mode">
    Waits for all tasks before processing:

    ```python
    executor = XennaExecutor(config={
        "execution_mode": "batch",
    })
    ```

    **Best for:**

    * Small to medium datasets
    * Operations requiring global ordering
    * When task dependencies span batches
  </Tab>
</Tabs>

## Methods

### `execute()`

Execute the pipeline stages.

```python
def execute(
    self,
    stages: list[ProcessingStage],
    initial_tasks: list[Task] | None = None,
) -> list[Task]:
    """Execute the pipeline stages.

    Args:
        stages: List of processing stages to execute.
        initial_tasks: Initial tasks (defaults to EmptyTask).

    Returns:
        List of output tasks from the final stage.
    """
```

## Error Handling

```python
executor = XennaExecutor(config={
    "ignore_failures": True,  # Continue despite errors
})

try:
    results = pipeline.run(executor=executor)
except Exception as e:
    # Handle pipeline-level failures
    print(f"Pipeline failed: {e}")
```

## Performance Monitoring

The executor automatically collects performance metrics:

```python
results = pipeline.run(executor=executor)

# Each task contains performance data
for task in results:
    for perf in task._stage_perf:
        print(f"Stage: {perf.stage_name}")
        print(f"  Duration: {perf.process_time}s")
        print(f"  Items processed: {perf.num_items_processed}")
```

## Source Code

[View source on GitHub](https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/backends/xenna/executor.py)
