nemo_curator.backends.xenna.adapter

View as Markdown

Module Contents

Classes

NameDescription
XennaStageAdapterAdapts ProcessingStage to Xenna.

Functions

NameDescription
create_named_xenna_stage_adapterWhen we run a pipeline in Xenna, since we wrap using XennaStageAdapter,

API

class nemo_curator.backends.xenna.adapter.XennaStageAdapter(
processing_stage: nemo_curator.stages.base.ProcessingStage
)

Bases: BaseStageAdapter, Stage

Adapts ProcessingStage to Xenna. Args: stage: ProcessingStage to adapt

env_info
RuntimeEnv | None

Runtime environment for this stage.

required_resources
XennaResources

Get the resources required for this stage.

stage_batch_size
int

Get the batch size for this stage.

nemo_curator.backends.xenna.adapter.XennaStageAdapter.process_data(
tasks: list[nemo_curator.tasks.Task]
) -> list[nemo_curator.tasks.Task] | None

Process batch of tasks with automatic performance tracking. Args: tasks: List of tasks to process Returns: List of processed tasks or None

nemo_curator.backends.xenna.adapter.XennaStageAdapter.setup(
worker_metadata: cosmos_xenna.pipelines.private.resources.WorkerMetadata
) -> None

Setup the stage per worker - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: worker_metadata: Xenna’s WorkerMetadata object

nemo_curator.backends.xenna.adapter.XennaStageAdapter.setup_on_node(
node_info: cosmos_xenna.pipelines.private.resources.NodeInfo,
worker_metadata: cosmos_xenna.pipelines.private.resources.WorkerMetadata
) -> None

Setup the stage on a node - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: node_info: Xenna’s NodeInfo object worker_metadata: Xenna’s WorkerMetadata object

nemo_curator.backends.xenna.adapter.create_named_xenna_stage_adapter(
stage: nemo_curator.stages.base.ProcessingStage
) -> nemo_curator.backends.xenna.adapter.XennaStageAdapter

When we run a pipeline in Xenna, since we wrap using XennaStageAdapter, the stage name is shown as XennaStageAdapter. This is not what we want. So we create a dynamic subclass with the original stage’s name. This ensures that when Xenna calls type(adapter).name, it returns the original stage’s class name rather than ‘XennaStageAdapter’. Args: stage (ProcessingStage): ProcessingStage to adapt

Returns: XennaStageAdapter

XennaStageAdapter instance with the wrapped stage’s class name