ReferenceInfra

Per-Stage Runtime Environments

View as Markdown

Run pipeline stages with different Python package versions in the same pipeline. Each stage can declare a runtime_env that tells Ray to create an isolated virtualenv for that stage’s workers, so incompatible library versions coexist without conflicts.

Overview

Some curation pipelines require stages that depend on different versions of the same library. For example, one stage might need transformers==4.40.0 for a specific model checkpoint, while another stage needs transformers==4.45.0 for a newer model. Without isolation, these stages cannot coexist in the same pipeline.

Per-stage runtime environments solve this by using Ray’s native runtime_env support. When a stage declares a runtime_env, Ray creates and caches an isolated virtualenv under /tmp/ray/session_latest/runtime_resources/pip/<hash>/virtualenv. Each unique dependency set gets its own cached virtualenv, reused for the lifetime of the Ray session. No driver-side virtualenv creation or PYTHONPATH manipulation is needed.

Usage

Declare dependencies on a stage class

Set runtime_env as a class variable on your ProcessingStage subclass:

1from nemo_curator.stages.base import ProcessingStage
2from nemo_curator.tasks import DocumentBatch
3
4class MyStage(ProcessingStage[DocumentBatch, DocumentBatch]):
5 name = "my_stage"
6 runtime_env = {"pip": ["transformers==4.40.0"]}
7
8 def inputs(self):
9 return ["data"], []
10
11 def outputs(self):
12 return ["data"], []
13
14 def process(self, task):
15 import transformers
16 # This worker sees transformers 4.40.0
17 ...

Override at instantiation time

Use with_() to change the runtime environment for a specific pipeline without modifying the stage class:

1stage = MyStage().with_(runtime_env={"pip": ["transformers==4.45.0"]})

Use uv as the package installer

Ray also supports uv as the package installer inside worker virtualenvs. Use the "uv" key instead of "pip" for faster installs:

1class FastInstallStage(ProcessingStage[DocumentBatch, DocumentBatch]):
2 name = "fast_install_stage"
3 runtime_env = {"uv": ["transformers==4.40.0"]}
4 ...

Both "pip" and "uv" keys work regardless of which package manager your local environment uses. The key only controls which installer Ray uses inside the worker virtualenv.

Backend Support

Per-stage runtime environments work with all three execution backends:

BackendHow runtime_env is applied
XennaExecutorA CuratorRuntimeEnv adapter bridges the stage’s runtime_env dict to Xenna’s env_info interface, which forwards it to Ray actors.
RayDataExecutorThe runtime_env is passed through ray_remote_args to map_batches, so each stage’s Ray tasks run in the correct virtualenv.
RayActorPoolExecutorThe runtime_env is passed to each actor’s .options() at pool creation time.

Behavior

  • Additive isolation: Isolated virtualenvs are cloned from the base environment. Packages installed in the base environment (such as NeMo Curator and its dependencies) remain importable in isolated workers unless explicitly overridden.
  • Caching: Ray caches each unique runtime_env specification. The first task dispatched to a new runtime_env triggers virtualenv creation; subsequent tasks reuse the cached environment.
  • No runtime_env: Stages that do not set runtime_env (the default) run in the base Python environment with no isolation overhead.

Container Setup

The NeMo Curator container image creates its virtualenv with uv venv --seed, which ensures that pip is available inside the venv. This is required because Ray’s pip-based runtime_env plugin clones the current virtualenv and needs pip to install stage-specific packages in the clone.

If you are running outside the official container, make sure your Python environment has pip available:

$uv venv --seed
$uv sync --locked

Example: Multi-Version Pipeline

This example runs three stages in a single pipeline, each seeing a different version of the packaging library. It uses RecordPackagingVersionStage, a test stage from PR #1623 that records the packaging library version visible to each worker:

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.backends.xenna import XennaExecutor
3from nemo_curator.stages.base import ProcessingStage
4from nemo_curator.stages.resources import Resources
5from nemo_curator.tasks import DocumentBatch
6
7
8class RecordPackagingVersionStage(ProcessingStage[DocumentBatch, DocumentBatch]):
9 """Records the packaging library version visible to this worker."""
10
11 name = "record_packaging_version"
12 resources = Resources(cpus=0.5)
13 batch_size = 1
14
15 def inputs(self):
16 return ["data"], []
17
18 def outputs(self):
19 return ["data"], []
20
21 def process(self, task):
22 import packaging
23
24 batch = task.to_pandas().copy()
25 batch[f"{self.name}_version"] = packaging.__version__
26 return DocumentBatch(
27 task_id=task.task_id,
28 dataset_name=task.dataset_name,
29 data=batch,
30 _metadata=task._metadata,
31 _stage_perf=task._stage_perf,
32 )
33
34
35pipeline = Pipeline(name="multi_version_demo")
36
37# Stage 1: uses the base environment (no runtime_env)
38pipeline.add_stage(RecordPackagingVersionStage().with_(name="base_env"))
39
40# Stage 2: needs packaging 23.2 (installed via pip)
41pipeline.add_stage(
42 RecordPackagingVersionStage()
43 .with_(name="pinned_v232", runtime_env={"pip": ["packaging==23.2"]})
44)
45
46# Stage 3: needs packaging 24.0 (installed via uv)
47pipeline.add_stage(
48 RecordPackagingVersionStage()
49 .with_(name="pinned_v240", runtime_env={"uv": ["packaging==24.0"]})
50)
51
52executor = XennaExecutor(config={"execution_mode": "streaming"})
53results = pipeline.run(executor)

Limitations

  • First-task latency: The first task dispatched to a stage with a new runtime_env incurs virtualenv creation time. Subsequent tasks reuse the cached environment.
  • Disk usage: Each unique runtime_env creates a separate virtualenv on each worker node. Monitor disk space under /tmp/ray/ for large clusters with many distinct environments.
  • Session scope: Cached virtualenvs are tied to the Ray session. Restarting Ray clears the cache.