> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.
> For AI client integration (Claude Code, Cursor, etc.), connect to the MCP server at https://docs.nvidia.com/nemo/curator/_mcp/server.

# nemo_curator.tasks.utils

## Module Contents

### Classes

| Name                                                       | Description                                                     |
| ---------------------------------------------------------- | --------------------------------------------------------------- |
| [`TaskPerfUtils`](#nemo_curator-tasks-utils-TaskPerfUtils) | Utilities for aggregating stage performance metrics from tasks. |

### API

```python
class nemo_curator.tasks.utils.TaskPerfUtils()
```

Utilities for aggregating stage performance metrics from tasks.

Example output format:
\{
"StageA": \{"process\_time": np.array(\[...]), "actor\_idle\_time": np.array(\[...]), "read\_time\_s": np.array(\[...]), ...},
"StageB": \{"process\_time": np.array(\[...]), ...}
}

```python
nemo_curator.tasks.utils.TaskPerfUtils._normalize_pipeline_tasks(
    tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
) -> dict[str, list[nemo_curator.tasks.tasks.Task]]
```

staticmethod

Return a mapping of pipeline name -> list of tasks from various input shapes.

```python
nemo_curator.tasks.utils.TaskPerfUtils.aggregate_task_metrics(
    tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
    prefix: str | None = None
) -> dict[str, typing.Any]
```

staticmethod

Aggregate task metrics by computing mean/std/sum.

```python
nemo_curator.tasks.utils.TaskPerfUtils.collect_stage_metrics(
    tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
) -> dict[str, dict[str, numpy.ndarray[float]]]
```

staticmethod

Collect per-stage metric lists from tasks or workflow outputs.

The returned mapping aggregates both built-in StagePerfStats metrics and any
custom\_stats recorded by stages.

**Parameters:**

Iterable of tasks, a workflow result dictionary, or WorkflowRunResult.

**Returns:** `dict[str, dict[str, np.ndarray[float]]]`

Dict mapping stage\_name -> metric\_name -> list of numeric values.

```python
nemo_curator.tasks.utils.TaskPerfUtils.get_aggregated_stage_stat(
    tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
    stage_prefix: str,
    stat: str
) -> float
```

staticmethod

Get an aggregated stat for stages matching a name prefix.

Sums the performance statistics from all stages whose names start with the given prefix
across all tasks.

**Parameters:**

A list of Task objects, a WorkflowRunResult, or a mapping of pipeline\_name -> list\[Task]

Match stages whose name starts with this prefix.

The stat to extract (e.g., "num\_items\_processed", "process\_time").

**Returns:** `float`

The aggregated stat value, or 0.0 if no matches found.