***

layout: overview
slug: nemo-curator/nemo\_curator/tasks/utils
title: nemo\_curator.tasks.utils
--------------------------------

## Module Contents

### Classes

| Name                                                       | Description                                                     |
| ---------------------------------------------------------- | --------------------------------------------------------------- |
| [`TaskPerfUtils`](#nemo_curator-tasks-utils-TaskPerfUtils) | Utilities for aggregating stage performance metrics from tasks. |

### API

<Anchor id="nemo_curator-tasks-utils-TaskPerfUtils">
  <CodeBlock showLineNumbers={false} wordWrap={true}>
    ```python
    class nemo_curator.tasks.utils.TaskPerfUtils()
    ```
  </CodeBlock>
</Anchor>

<Indent>
  Utilities for aggregating stage performance metrics from tasks.

  Example output format:
  \{
  "StageA": \{"process\_time": np.array(\[...]), "actor\_idle\_time": np.array(\[...]), "read\_time\_s": np.array(\[...]), ...},
  "StageB": \{"process\_time": np.array(\[...]), ...}
  }

  <Anchor id="nemo_curator-tasks-utils-TaskPerfUtils-_normalize_pipeline_tasks">
    <CodeBlock links={{"nemo_curator.tasks.tasks.Task":"/nemo-curator/nemo_curator/tasks/tasks#nemo_curator-tasks-tasks-Task","nemo_curator.pipeline.workflow.WorkflowRunResult":"/nemo-curator/nemo_curator/pipeline/workflow#nemo_curator-pipeline-workflow-WorkflowRunResult"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.tasks.utils.TaskPerfUtils._normalize_pipeline_tasks(
          tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
      ) -> dict[str, list[nemo_curator.tasks.tasks.Task]]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    <Badge>
      staticmethod
    </Badge>

    Return a mapping of pipeline name -> list of tasks from various input shapes.
  </Indent>

  <Anchor id="nemo_curator-tasks-utils-TaskPerfUtils-aggregate_task_metrics">
    <CodeBlock links={{"nemo_curator.tasks.tasks.Task":"/nemo-curator/nemo_curator/tasks/tasks#nemo_curator-tasks-tasks-Task","nemo_curator.pipeline.workflow.WorkflowRunResult":"/nemo-curator/nemo_curator/pipeline/workflow#nemo_curator-pipeline-workflow-WorkflowRunResult"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.tasks.utils.TaskPerfUtils.aggregate_task_metrics(
          tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
          prefix: str | None = None
      ) -> dict[str, typing.Any]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    <Badge>
      staticmethod
    </Badge>

    Aggregate task metrics by computing mean/std/sum.
  </Indent>

  <Anchor id="nemo_curator-tasks-utils-TaskPerfUtils-collect_stage_metrics">
    <CodeBlock links={{"nemo_curator.tasks.tasks.Task":"/nemo-curator/nemo_curator/tasks/tasks#nemo_curator-tasks-tasks-Task","nemo_curator.pipeline.workflow.WorkflowRunResult":"/nemo-curator/nemo_curator/pipeline/workflow#nemo_curator-pipeline-workflow-WorkflowRunResult"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.tasks.utils.TaskPerfUtils.collect_stage_metrics(
          tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None
      ) -> dict[str, dict[str, numpy.ndarray[float]]]
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    <Badge>
      staticmethod
    </Badge>

    Collect per-stage metric lists from tasks or workflow outputs.

    The returned mapping aggregates both built-in StagePerfStats metrics and any
    custom\_stats recorded by stages.

    **Parameters:**

    <ParamField path="tasks" type="list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None">
      Iterable of tasks, a workflow result dictionary, or WorkflowRunResult.
    </ParamField>

    **Returns:** `dict[str, dict[str, np.ndarray[float]]]`

    Dict mapping stage\_name -> metric\_name -> list of numeric values.
  </Indent>

  <Anchor id="nemo_curator-tasks-utils-TaskPerfUtils-get_aggregated_stage_stat">
    <CodeBlock links={{"nemo_curator.tasks.tasks.Task":"/nemo-curator/nemo_curator/tasks/tasks#nemo_curator-tasks-tasks-Task","nemo_curator.pipeline.workflow.WorkflowRunResult":"/nemo-curator/nemo_curator/pipeline/workflow#nemo_curator-pipeline-workflow-WorkflowRunResult"}} showLineNumbers={false} wordWrap={true}>
      ```python
      nemo_curator.tasks.utils.TaskPerfUtils.get_aggregated_stage_stat(
          tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None,
          stage_prefix: str,
          stat: str
      ) -> float
      ```
    </CodeBlock>
  </Anchor>

  <Indent>
    <Badge>
      staticmethod
    </Badge>

    Get an aggregated stat for stages matching a name prefix.

    Sums the performance statistics from all stages whose names start with the given prefix
    across all tasks.

    **Parameters:**

    <ParamField path="tasks" type="list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None">
      A list of Task objects, a WorkflowRunResult, or a mapping of pipeline\_name -> list\[Task]
    </ParamField>

    <ParamField path="stage_prefix" type="str">
      Match stages whose name starts with this prefix.
    </ParamField>

    <ParamField path="stat" type="str">
      The stat to extract (e.g., "num\_items\_processed", "process\_time").
    </ParamField>

    **Returns:** `float`

    The aggregated stat value, or 0.0 if no matches found.
  </Indent>
</Indent>
