nemo_curator.backends.xenna.adapter

View as Markdown

Module Contents

Classes

NameDescription
CuratorRuntimeEnvDuck-typed replacement for Xenna’s RuntimeEnv that supports the full Ray runtime_env dict.
XennaStageAdapterAdapts ProcessingStage to Xenna.

Functions

NameDescription
create_named_xenna_stage_adapterWhen we run a pipeline in Xenna, since we wrap using XennaStageAdapter,

API

class nemo_curator.backends.xenna.adapter.CuratorRuntimeEnv(
runtime_env: dict[str, typing.Any]
)

Duck-typed replacement for Xenna’s RuntimeEnv that supports the full Ray runtime_env dict.

Xenna’s RuntimeEnv only supports conda + env_vars. This class accepts a raw Ray-format runtime_env dict and implements the interface Xenna calls: to_ray_runtime_env(), format(), and extra_env_vars.

extra_env_vars
dict[str, str] = dict(runtime_env.get('env_vars', {}))
nemo_curator.backends.xenna.adapter.CuratorRuntimeEnv.format() -> str
nemo_curator.backends.xenna.adapter.CuratorRuntimeEnv.to_ray_runtime_env() -> ray.runtime_env.RuntimeEnv
class nemo_curator.backends.xenna.adapter.XennaStageAdapter(
processing_stage: nemo_curator.stages.base.ProcessingStage
)

Bases: BaseStageAdapter, Stage

Adapts ProcessingStage to Xenna. Args: stage: ProcessingStage to adapt

env_info
RuntimeEnv | None

Runtime environment for this stage.

Converts the ProcessingStage.runtime_env dict (Ray-format) to a CuratorRuntimeEnv that Xenna can forward to Ray actors.

required_resources
XennaResources

Get the resources required for this stage.

stage_batch_size
int

Get the batch size for this stage.

nemo_curator.backends.xenna.adapter.XennaStageAdapter.process_data(
tasks: list[nemo_curator.tasks.Task]
) -> list[nemo_curator.tasks.Task] | None

Process batch of tasks with automatic performance tracking. Args: tasks: List of tasks to process Returns: List of processed tasks or None

nemo_curator.backends.xenna.adapter.XennaStageAdapter.setup(
worker_metadata: cosmos_xenna.pipelines.private.resources.WorkerMetadata
) -> None

Setup the stage per worker - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: worker_metadata: Xenna’s WorkerMetadata object

nemo_curator.backends.xenna.adapter.XennaStageAdapter.setup_on_node(
node_info: cosmos_xenna.pipelines.private.resources.NodeInfo,
worker_metadata: cosmos_xenna.pipelines.private.resources.WorkerMetadata
) -> None

Setup the stage on a node - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: node_info: Xenna’s NodeInfo object worker_metadata: Xenna’s WorkerMetadata object

nemo_curator.backends.xenna.adapter.create_named_xenna_stage_adapter(
stage: nemo_curator.stages.base.ProcessingStage
) -> nemo_curator.backends.xenna.adapter.XennaStageAdapter

When we run a pipeline in Xenna, since we wrap using XennaStageAdapter, the stage name is shown as XennaStageAdapter. This is not what we want. So we create a dynamic subclass with the original stage’s name. This ensures that when Xenna calls type(adapter).name, it returns the original stage’s class name rather than ‘XennaStageAdapter’. Args: stage (ProcessingStage): ProcessingStage to adapt

Returns: XennaStageAdapter

XennaStageAdapter instance with the wrapped stage’s class name