backends.xenna.adapter
#
Module Contents#
Classes#
Adapts ProcessingStage to Xenna. Args: stage: ProcessingStage to adapt |
Functions#
When we run a pipeline in Xenna, since we wrap using XennaStageAdapter, the stage name is shown as XennaStageAdapter. This is not what we want. So we create a dynamic subclass with the original stage’s name. This ensures that when Xenna calls type(adapter).name, it returns the original stage’s class name rather than ‘XennaStageAdapter’. Args: stage (ProcessingStage): ProcessingStage to adapt |
API#
- class backends.xenna.adapter.XennaStageAdapter(
- processing_stage: nemo_curator.stages.base.ProcessingStage,
Bases:
nemo_curator.backends.base.BaseStageAdapter
,cosmos_xenna.pipelines.v1.Stage
Adapts ProcessingStage to Xenna. Args: stage: ProcessingStage to adapt
Initialization
- property env_info: cosmos_xenna.pipelines.v1.RuntimeEnv | None#
Runtime environment for this stage.
- process_data(
- tasks: list[nemo_curator.tasks.Task],
Process batch of tasks with automatic performance tracking. Args: tasks: List of tasks to process Returns: List of processed tasks or None
- property required_resources: cosmos_xenna.ray_utils.resources.Resources#
Get the resources required for this stage.
- setup(
- worker_metadata: cosmos_xenna.ray_utils.resources.WorkerMetadata,
Setup the stage per worker - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: worker_metadata: Xenna’s WorkerMetadata object
- setup_on_node(
- node_info: cosmos_xenna.ray_utils.resources.NodeInfo,
- worker_metadata: cosmos_xenna.ray_utils.resources.WorkerMetadata,
Setup the stage on a node - Xenna-specific signature. This method is called by Xenna with its specific types. We convert them to our generic types and delegate to the base adapter. Args: node_info: Xenna’s NodeInfo object worker_metadata: Xenna’s WorkerMetadata object
- property stage_batch_size: int#
Get the batch size for this stage.
- backends.xenna.adapter.create_named_xenna_stage_adapter(
- stage: nemo_curator.stages.base.ProcessingStage,
When we run a pipeline in Xenna, since we wrap using XennaStageAdapter, the stage name is shown as XennaStageAdapter. This is not what we want. So we create a dynamic subclass with the original stage’s name. This ensures that when Xenna calls type(adapter).name, it returns the original stage’s class name rather than ‘XennaStageAdapter’. Args: stage (ProcessingStage): ProcessingStage to adapt
Returns: XennaStageAdapter: XennaStageAdapter instance with the wrapped stage’s class name