backends.xenna.adapter#

Module Contents#

Classes#

XennaStageAdapter

Adapts ProcessingStage to Xenna. Args: stage: ProcessingStage to adapt

Functions#

create_named_xenna_stage_adapter

When we run a pipeline in Xenna, since we wrap using XennaStageAdapter, the stage name is shown as XennaStageAdapter. This is not what we want. So we create a dynamic subclass with the original stage’s name. This ensures that when Xenna calls type(adapter).name, it returns the original stage’s class name rather than ‘XennaStageAdapter’. Args: stage (ProcessingStage): ProcessingStage to adapt

API#

class backends.xenna.adapter.XennaStageAdapter(
processing_stage: nemo_curator.stages.base.ProcessingStage,
)#

Bases: nemo_curator.backends.base.BaseStageAdapter, cosmos_xenna.pipelines.v1.Stage

Adapts ProcessingStage to Xenna. Args: stage: ProcessingStage to adapt

Initialization

property env_info: cosmos_xenna.pipelines.v1.RuntimeEnv | None#

Runtime environment for this stage.

process_data(
tasks: list[nemo_curator.tasks.Task],
) list[nemo_curator.tasks.Task] | None#

Process batch of tasks with automatic performance tracking. Args: tasks: List of tasks to process Returns: List of processed tasks or None

property required_resources: cosmos_xenna.ray_utils.resources.Resources#

Get the resources required for this stage.

setup(
worker_metadata: cosmos_xenna.ray_utils.resources.WorkerMetadata,
) None#

Setup the stage per worker - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: worker_metadata: Xenna’s WorkerMetadata object

setup_on_node(
node_info: cosmos_xenna.ray_utils.resources.NodeInfo,
worker_metadata: cosmos_xenna.ray_utils.resources.WorkerMetadata,
) None#

Setup the stage on a node - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: node_info: Xenna’s NodeInfo object worker_metadata: Xenna’s WorkerMetadata object

property stage_batch_size: int#

Get the batch size for this stage.

backends.xenna.adapter.create_named_xenna_stage_adapter(
stage: nemo_curator.stages.base.ProcessingStage,
) backends.xenna.adapter.XennaStageAdapter#

When we run a pipeline in Xenna, since we wrap using XennaStageAdapter, the stage name is shown as XennaStageAdapter. This is not what we want. So we create a dynamic subclass with the original stage’s name. This ensures that when Xenna calls type(adapter).name, it returns the original stage’s class name rather than ‘XennaStageAdapter’. Args: stage (ProcessingStage): ProcessingStage to adapt

Returns: XennaStageAdapter: XennaStageAdapter instance with the wrapped stage’s class name