nemo_curator.tasks.utils

View as Markdown

Module Contents

Classes

NameDescription
TaskPerfUtilsUtilities for aggregating stage performance metrics from tasks.

API

class nemo_curator.tasks.utils.TaskPerfUtils()

Utilities for aggregating stage performance metrics from tasks.

Example output format: { “StageA”: {“process_time”: np.array([…]), “actor_idle_time”: np.array([…]), “read_time_s”: np.array([…]), …}, “StageB”: {“process_time”: np.array([…]), …} }

nemo_curator.tasks.utils.TaskPerfUtils._normalize_pipeline_tasks(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
) -> dict[str, list[nemo_curator.tasks.tasks.Task]]
staticmethod

Return a mapping of pipeline name -> list of tasks from various input shapes.

nemo_curator.tasks.utils.TaskPerfUtils.aggregate_task_metrics(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None, tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None, tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
prefix: str | None = None
) -> dict[str, typing.Any]
staticmethod

Aggregate task metrics by computing mean/std/sum.

nemo_curator.tasks.utils.TaskPerfUtils.collect_stage_metrics(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
) -> dict[str, dict[str, numpy.ndarray[float]]]
staticmethod

Collect per-stage metric lists from tasks or workflow outputs.

The returned mapping aggregates both built-in StagePerfStats metrics and any custom_stats recorded by stages.

Parameters:

tasks
list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None

Iterable of tasks, a workflow result dictionary, or WorkflowRunResult.

Returns: dict[str, dict[str, np.ndarray[float]]]

Dict mapping stage_name -> metric_name -> list of numeric values.

nemo_curator.tasks.utils.TaskPerfUtils.get_aggregated_stage_stat(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None, tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None, tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
stage_prefix: str,
stat: str
) -> float
staticmethod

Get an aggregated stat for stages matching a name prefix.

Sums the performance statistics from all stages whose names start with the given prefix across all tasks.

Parameters:

tasks
list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None

A list of Task objects, a WorkflowRunResult, or a mapping of pipeline_name -> list[Task]

stage_prefix
str

Match stages whose name starts with this prefix.

stat
str

The stat to extract (e.g., “num_items_processed”, “process_time”).

Returns: float

The aggregated stat value, or 0.0 if no matches found.