ReferenceInfra

Pipeline Execution Backends

View as Markdown

Configure and optimize execution backends to run NeMo Curator pipelines efficiently across single machines, multi-GPU systems, and distributed clusters.

Overview

Execution backends (executors) are the engines that run NeMo Curator Pipeline workflows across your compute resources. They handle:

  • Task Distribution: Distribute pipeline stages across available workers and GPUs
  • Resource Management: Allocate CPU, GPU, and memory resources to processing tasks
  • Scaling: Automatically or manually scale processing based on workload
  • Data Movement: Optimize data transfer between pipeline stages

Choosing the right executor impacts:

  • Pipeline performance and throughput
  • Resource utilization efficiency
  • Ease of deployment and monitoring

This guide covers all execution backends available in NeMo Curator and applies to all modalities: text, image, video, and audio curation.

Basic Usage Pattern

All pipelines follow this standard execution pattern:

1from nemo_curator.pipeline import Pipeline
2
3pipeline = Pipeline(name="example_pipeline", description="Curator pipeline")
4pipeline.add_stage(...)
5
6# Choose an executor below and run
7results = pipeline.run(executor)

Key points:

  • The same pipeline definition works with any executor
  • Executor choice is independent of pipeline stages
  • Switch executors without changing pipeline code

Available Backends

XennaExecutor is the production-ready executor that uses Cosmos-Xenna, a Ray-based execution engine optimized for distributed data processing. Xenna provides native streaming support, automatic resource scaling, and built-in fault tolerance. It’s the recommended choice for most production workloads, especially for video and multimodal pipelines.

Key Features:

  • Streaming execution: Process data incrementally as it arrives, reducing memory requirements
  • Auto-scaling: Dynamically adjusts worker allocation based on stage throughput
  • Fault tolerance: Built-in error handling and recovery mechanisms
  • Resource optimization: Efficient CPU and GPU allocation for video/multimodal workloads
1from nemo_curator.backends.xenna import XennaExecutor
2
3executor = XennaExecutor(
4 config={
5 # Execution mode: 'streaming' (default) or 'batch'
6 # Batch processes all data for a stage before moving to the next; streaming runs stages concurrently.
7 "execution_mode": "streaming",
8
9 # Logging interval: seconds between status logs (default: 60)
10 # Controls how frequently progress updates are printed
11 "logging_interval": 60,
12
13 # Ignore failures: whether to continue on failures (default: False)
14 # When True, the pipeline continues execution instead of failing fast when stages raise errors.
15 "ignore_failures": False,
16
17 # CPU allocation percentage: ratio of CPU to allocate (0-1, default: 0.95)
18 # Fraction of available CPU resources to use for pipeline execution
19 "cpu_allocation_percentage": 0.95,
20
21 # Autoscale interval: seconds between auto-scaling checks (default: 180)
22 # How often to run the stage auto-scaler.
23 "autoscale_interval_s": 180,
24
25 # Max workers per stage: maximum number of workers (optional)
26 # Limits worker count per stage; None means no limit
27 "max_workers_per_stage": None,
28 }
29)
30
31results = pipeline.run(executor)

Configuration Parameters:

ParameterTypeDefaultDescription
execution_modestr"streaming"Execution mode: "streaming" for incremental processing or "batch" for full dataset processing
logging_intervalint60Seconds between status log updates
ignore_failuresboolFalseIf True, continue pipeline execution even when stages fail
cpu_allocation_percentagefloat0.95Fraction (0-1) of available CPU resources to allocate
autoscale_interval_sint180Seconds between auto-scaling evaluations
max_workers_per_stageint | NoneNoneMaximum workers per stage; None means no limit

For more details, refer to the official NVIDIA Cosmos-Xenna project.

RayDataExecutor

RayDataExecutor uses Ray Data, a scalable data processing library built on Ray Core. Ray Data provides a familiar DataFrame-like API for distributed data transformations. This executor is experimental and best suited for large-scale batch processing tasks that benefit from Ray Data’s optimized data loading and transformation pipelines.

Key Features:

  • Ray Data API: Leverages Ray Data’s optimized data processing primitives
  • Scalable transformations: Efficient map-batch operations across distributed workers
  • Experimental status: API and performance characteristics may change
1from nemo_curator.backends.experimental.ray_data import RayDataExecutor
2
3executor = RayDataExecutor()
4results = pipeline.run(executor)

RayDataExecutor currently has limited configuration options. For more control over execution, consider using XennaExecutor or RayActorPoolExecutor.

RayActorPoolExecutor

Executor using Ray Actor pools for custom distributed processing patterns such as deduplication.

1from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor
2
3executor = RayActorPoolExecutor()
4results = pipeline.run(executor)

Ray Executors in Practice

Ray-based executors provide enhanced scalability and performance for large-scale data processing tasks. They’re beneficial for:

  • Large-scale classification tasks: Distributed inference across multi-GPU setups
  • Deduplication workflows: Parallel processing of document similarity computations
  • Resource-intensive stages: Automatic scaling based on computational demands

When to Use Ray Executors

Consider Ray executors when:

  • Processing datasets that exceed single-machine capacity
  • Running GPU-intensive stages (classifiers, embedding models, etc.)
  • Needing automatic fault tolerance and recovery
  • Scaling across multi-node clusters

Ray vs. Xenna Executors

FeatureXennaExecutorRay Executors
MaturityProduction-readyExperimental
StreamingNative supportLimited
Resource ManagementOptimized for video/multimodalGeneral-purpose
Fault ToleranceBuilt-inRay-native
ScalingAuto-scalingManual configuration

Recommendation: Use XennaExecutor for production workloads and Ray executors for experimental large-scale processing.

Ray executors emit an experimental warning as the API and performance characteristics may change.

Choosing a Backend

Both options can deliver strong performance; choose based on API fit and maturity:

  • XennaExecutor: Default for most workloads due to maturity and extensive real-world usage (including video pipelines); supports streaming and batch execution with auto-scaling.
  • Ray Executors (experimental): Use Ray Data API for scalable data processing; the interface is still experimental and may change.

Minimal End-to-End example

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.backends.xenna import XennaExecutor
3
4# Build your pipeline
5pipeline = Pipeline(name="curator_pipeline")
6# pipeline.add_stage(stage1)
7# pipeline.add_stage(stage2)
8
9# Run with Xenna (recommended)
10executor = XennaExecutor(config={"execution_mode": "streaming"})
11results = pipeline.run(executor)
12
13print(f"Completed with {len(results) if results else 0} output tasks")