Pipeline Execution Backends#

Configure and optimize execution backends to run NeMo Curator pipelines efficiently across single machines, multi-GPU systems, and distributed clusters.

Overview#

Execution backends (executors) are the engines that run NeMo Curator Pipeline workflows across your compute resources. They handle:

  • Task Distribution: Distribute pipeline stages across available workers and GPUs

  • Resource Management: Allocate CPU, GPU, and memory resources to processing tasks

  • Scaling: Automatically or manually scale processing based on workload

  • Data Movement: Optimize data transfer between pipeline stages

Choosing the right executor impacts:

  • Pipeline performance and throughput

  • Resource utilization efficiency

  • Ease of deployment and monitoring

This guide covers all execution backends available in NeMo Curator and applies to all modalities: text, image, video, and audio curation.

Basic Usage Pattern#

All pipelines follow this standard execution pattern:

from nemo_curator.pipeline import Pipeline

pipeline = Pipeline(name="example_pipeline", description="Curator pipeline")
pipeline.add_stage(...)

# Choose an executor below and run
results = pipeline.run(executor)

Key points:

  • The same pipeline definition works with any executor

  • Executor choice is independent of pipeline stages

  • Switch executors without changing pipeline code

Available Backends#

RayActorPoolExecutor#

Executor using Ray Actor pools for custom distributed processing patterns such as deduplication.

RayDataExecutor uses Ray Data, a scalable data processing library built on Ray Core. Ray Data provides a familiar DataFrame-like API for distributed data transformations. This executor is experimental and best suited for large-scale batch processing tasks that benefit from Ray Data’s optimized data loading and transformation pipelines.

Key Features:

  • Ray Data API: Leverages Ray Data’s optimized data processing primitives

  • Scalable transformations: Efficient map-batch operations across distributed workers

  • Experimental status: API and performance characteristics may change

from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor

executor = RayActorPoolExecutor()
results = pipeline.run(executor)

RayActorPoolExecutor (experimental)#

from nemo_curator.backends.experimental.ray_data import RayDataExecutor

executor = RayDataExecutor()
results = pipeline.run(executor)

Ray Executors in Practice#

Ray-based executors provide enhanced scalability and performance for large-scale data processing tasks. They’re beneficial for:

  • Large-scale classification tasks: Distributed inference across multi-GPU setups

  • Deduplication workflows: Parallel processing of document similarity computations

  • Resource-intensive stages: Automatic scaling based on computational demands

When to Use Ray Executors#

Consider Ray executors when:

  • Processing datasets that exceed single-machine capacity

  • Running GPU-intensive stages (classifiers, embedding models, etc.)

  • Needing automatic fault tolerance and recovery

  • Scaling across multi-node clusters

Ray vs. Xenna Executors#

Feature

XennaExecutor

Ray Executors

Maturity

Production-ready

Experimental

Streaming

Native support

Limited

Resource Management

Optimized for video/multimodal

General-purpose

Fault Tolerance

Built-in

Ray-native

Scaling

Auto-scaling

Manual configuration

Recommendation: Use XennaExecutor for production workloads and Ray executors for experimental large-scale processing.

Note

Ray executors emit an experimental warning as the API and performance characteristics may change.

Choosing a Backend#

Both options can deliver strong performance; choose based on API fit and maturity:

  • XennaExecutor: Default for most workloads due to maturity and extensive real-world usage (including video pipelines); supports streaming and batch execution with auto-scaling.

  • Ray Executors (experimental): Use Ray Data API for scalable data processing; the interface is still experimental and may change.

Minimal End-to-End example#

from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor

# Build your pipeline
pipeline = Pipeline(name="curator_pipeline")
# pipeline.add_stage(stage1)
# pipeline.add_stage(stage2)

# Run with Xenna (recommended)
executor = XennaExecutor(config={"execution_mode": "streaming"})
results = pipeline.run(executor)

print(f"Completed with {len(results) if results else 0} output tasks")