> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

> Isolate Python dependencies per pipeline stage using Ray runtime environments

# Per-Stage Runtime Environments

Run pipeline stages with different Python package versions in the same pipeline. Each stage can declare a `runtime_env` that tells Ray to create an isolated virtualenv for that stage's workers, so incompatible library versions coexist without conflicts.

## Overview

Some curation pipelines require stages that depend on different versions of the same library. For example, one stage might need `transformers==4.40.0` for a specific model checkpoint, while another stage needs `transformers==4.45.0` for a newer model. Without isolation, these stages cannot coexist in the same pipeline.

Per-stage runtime environments solve this by using Ray's native `runtime_env` support. When a stage declares a `runtime_env`, Ray creates and caches an isolated virtualenv under `/tmp/ray/session_latest/runtime_resources/pip/<hash>/virtualenv`. Each unique dependency set gets its own cached virtualenv, reused for the lifetime of the Ray session. No driver-side virtualenv creation or `PYTHONPATH` manipulation is needed.

## Usage

### Declare dependencies on a stage class

Set `runtime_env` as a class variable on your `ProcessingStage` subclass:

```python
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import DocumentBatch

class MyStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    name = "my_stage"
    runtime_env = {"pip": ["transformers==4.40.0"]}

    def inputs(self):
        return ["data"], []

    def outputs(self):
        return ["data"], []

    def process(self, task):
        import transformers
        # This worker sees transformers 4.40.0
        ...
```

### Override at instantiation time

Use `with_()` to change the runtime environment for a specific pipeline without modifying the stage class:

```python
stage = MyStage().with_(runtime_env={"pip": ["transformers==4.45.0"]})
```

### Use `uv` as the package installer

Ray also supports `uv` as the package installer inside worker virtualenvs. Use the `"uv"` key instead of `"pip"` for faster installs:

```python
class FastInstallStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    name = "fast_install_stage"
    runtime_env = {"uv": ["transformers==4.40.0"]}
    ...
```

Both `"pip"` and `"uv"` keys work regardless of which package manager your local environment uses. The key only controls which installer Ray uses inside the worker virtualenv.

## Backend Support

Per-stage runtime environments work with all three execution backends:

| Backend                    | How `runtime_env` is applied                                                                                                           |
| -------------------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| **`XennaExecutor`**        | A `CuratorRuntimeEnv` adapter bridges the stage's `runtime_env` dict to Xenna's `env_info` interface, which forwards it to Ray actors. |
| **`RayDataExecutor`**      | The `runtime_env` is passed through `ray_remote_args` to `map_batches`, so each stage's Ray tasks run in the correct virtualenv.       |
| **`RayActorPoolExecutor`** | The `runtime_env` is passed to each actor's `.options()` at pool creation time.                                                        |

## Behavior

* **Additive isolation**: Isolated virtualenvs are cloned from the base environment. Packages installed in the base environment (such as NeMo Curator and its dependencies) remain importable in isolated workers unless explicitly overridden.
* **Caching**: Ray caches each unique `runtime_env` specification. The first task dispatched to a new `runtime_env` triggers virtualenv creation; subsequent tasks reuse the cached environment.
* **No runtime\_env**: Stages that do not set `runtime_env` (the default) run in the base Python environment with no isolation overhead.

## Container Setup

The NeMo Curator container image creates its virtualenv with `uv venv --seed`, which ensures that `pip` is available inside the venv. This is required because Ray's pip-based `runtime_env` plugin clones the current virtualenv and needs `pip` to install stage-specific packages in the clone.

If you are running outside the official container, make sure your Python environment has `pip` available:

```bash
uv venv --seed
uv sync --locked
```

## Example: Multi-Version Pipeline

This example runs three stages in a single pipeline, each seeing a different version of the `packaging` library. It uses `RecordPackagingVersionStage`, a test stage from PR #1623 that records the `packaging` library version visible to each worker:

```python
from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks import DocumentBatch


class RecordPackagingVersionStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    """Records the packaging library version visible to this worker."""

    name = "record_packaging_version"
    resources = Resources(cpus=0.5)
    batch_size = 1

    def inputs(self):
        return ["data"], []

    def outputs(self):
        return ["data"], []

    def process(self, task):
        import packaging

        batch = task.to_pandas().copy()
        batch[f"{self.name}_version"] = packaging.__version__
        return DocumentBatch(
            task_id=task.task_id,
            dataset_name=task.dataset_name,
            data=batch,
            _metadata=task._metadata,
            _stage_perf=task._stage_perf,
        )


pipeline = Pipeline(name="multi_version_demo")

# Stage 1: uses the base environment (no runtime_env)
pipeline.add_stage(RecordPackagingVersionStage().with_(name="base_env"))

# Stage 2: needs packaging 23.2 (installed via pip)
pipeline.add_stage(
    RecordPackagingVersionStage()
    .with_(name="pinned_v232", runtime_env={"pip": ["packaging==23.2"]})
)

# Stage 3: needs packaging 24.0 (installed via uv)
pipeline.add_stage(
    RecordPackagingVersionStage()
    .with_(name="pinned_v240", runtime_env={"uv": ["packaging==24.0"]})
)

executor = XennaExecutor(config={"execution_mode": "streaming"})
results = pipeline.run(executor)
```

## Limitations

* **First-task latency**: The first task dispatched to a stage with a new `runtime_env` incurs virtualenv creation time. Subsequent tasks reuse the cached environment.
* **Disk usage**: Each unique `runtime_env` creates a separate virtualenv on each worker node. Monitor disk space under `/tmp/ray/` for large clusters with many distinct environments.
* **Session scope**: Cached virtualenvs are tied to the Ray session. Restarting Ray clears the cache.