API ReferenceExecutors

XennaExecutor

View as Markdown

XennaExecutor is the production executor that uses Cosmos-Xenna for distributed execution. It’s the default executor used when running pipelines.

Import

1from nemo_curator.backends.xenna import XennaExecutor

Class Definition

1class XennaExecutor(BaseExecutor):
2 """Production executor using Cosmos-Xenna for distributed execution.
3
4 Provides:
5 - Distributed task orchestration
6 - Resource allocation and management
7 - Batch processing optimization
8 - Performance metrics collection
9 """
10
11 def __init__(
12 self,
13 config: dict[str, Any] | None = None,
14 ignore_head_node: bool = False,
15 ) -> None:
16 """Initialize the executor.
17
18 Args:
19 config: Executor configuration dictionary.
20 ignore_head_node: Not supported. Raises ValueError if True.
21 """

Configuration Options

OptionTypeDefaultDescription
logging_intervalint60Seconds between progress logs
ignore_failuresboolFalseContinue on task failures
max_workers_per_stageint | NoneNoneMax workers per stage
execution_modestr"streaming""streaming" or "batch"
cpu_allocation_percentagefloat0.95CPU allocation fraction
autoscale_interval_sint180Autoscaling check interval

Usage Examples

Default Configuration

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.backends.xenna import XennaExecutor
3
4pipeline = Pipeline(name="my_pipeline", stages=[...])
5
6# Default executor
7results = pipeline.run()
8
9# Equivalent to:
10executor = XennaExecutor()
11results = pipeline.run(executor=executor)

Custom Configuration

1executor = XennaExecutor(config={
2 "logging_interval": 30,
3 "ignore_failures": True,
4 "execution_mode": "batch",
5 "cpu_allocation_percentage": 0.9,
6})
7
8results = pipeline.run(executor=executor)

Streaming vs Batch Mode

Processes tasks as they become available:

1executor = XennaExecutor(config={
2 "execution_mode": "streaming",
3})

Best for:

  • Large datasets
  • Memory-constrained environments
  • Real-time processing

Methods

execute()

Execute the pipeline stages.

1def execute(
2 self,
3 stages: list[ProcessingStage],
4 initial_tasks: list[Task] | None = None,
5) -> list[Task]:
6 """Execute the pipeline stages.
7
8 Args:
9 stages: List of processing stages to execute.
10 initial_tasks: Initial tasks (defaults to EmptyTask).
11
12 Returns:
13 List of output tasks from the final stage.
14 """

Error Handling

1executor = XennaExecutor(config={
2 "ignore_failures": True, # Continue despite errors
3})
4
5try:
6 results = pipeline.run(executor=executor)
7except Exception as e:
8 # Handle pipeline-level failures
9 print(f"Pipeline failed: {e}")

Performance Monitoring

The executor automatically collects performance metrics:

1results = pipeline.run(executor=executor)
2
3# Each task contains performance data
4for task in results:
5 for perf in task._stage_perf:
6 print(f"Stage: {perf.stage_name}")
7 print(f" Duration: {perf.process_time}s")
8 print(f" Items processed: {perf.num_items_processed}")

Source Code

View source on GitHub