***
layout: overview
slug: nemo-curator/nemo\_curator/tasks/utils
title: nemo\_curator.tasks.utils
--------------------------------
## Module Contents
### Classes
| Name | Description |
| ---------------------------------------------------------- | --------------------------------------------------------------- |
| [`TaskPerfUtils`](#nemo_curator-tasks-utils-TaskPerfUtils) | Utilities for aggregating stage performance metrics from tasks. |
### API
```python
class nemo_curator.tasks.utils.TaskPerfUtils()
```
Utilities for aggregating stage performance metrics from tasks.
Example output format:
\{
"StageA": \{"process\_time": np.array(\[...]), "actor\_idle\_time": np.array(\[...]), "read\_time\_s": np.array(\[...]), ...},
"StageB": \{"process\_time": np.array(\[...]), ...}
}
```python
nemo_curator.tasks.utils.TaskPerfUtils._normalize_pipeline_tasks(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
) -> dict[str, list[nemo_curator.tasks.tasks.Task]]
```
staticmethod
Return a mapping of pipeline name -> list of tasks from various input shapes.
```python
nemo_curator.tasks.utils.TaskPerfUtils.aggregate_task_metrics(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
prefix: str | None = None
) -> dict[str, typing.Any]
```
staticmethod
Aggregate task metrics by computing mean/std/sum.
```python
nemo_curator.tasks.utils.TaskPerfUtils.collect_stage_metrics(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
) -> dict[str, dict[str, numpy.ndarray[float]]]
```
staticmethod
Collect per-stage metric lists from tasks or workflow outputs.
The returned mapping aggregates both built-in StagePerfStats metrics and any
custom\_stats recorded by stages.
**Parameters:**
Iterable of tasks, a workflow result dictionary, or WorkflowRunResult.
**Returns:** `dict[str, dict[str, np.ndarray[float]]]`
Dict mapping stage\_name -> metric\_name -> list of numeric values.
```python
nemo_curator.tasks.utils.TaskPerfUtils.get_aggregated_stage_stat(
tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
stage_prefix: str,
stat: str
) -> float
```
staticmethod
Get an aggregated stat for stages matching a name prefix.
Sums the performance statistics from all stages whose names start with the given prefix
across all tasks.
**Parameters:**
A list of Task objects, a WorkflowRunResult, or a mapping of pipeline\_name -> list\[Task]
Match stages whose name starts with this prefix.
The stat to extract (e.g., "num\_items\_processed", "process\_time").
**Returns:** `float`
The aggregated stat value, or 0.0 if no matches found.