*** layout: overview slug: nemo-curator/nemo\_curator/tasks/utils title: nemo\_curator.tasks.utils -------------------------------- ## Module Contents ### Classes | Name | Description | | ---------------------------------------------------------- | --------------------------------------------------------------- | | [`TaskPerfUtils`](#nemo_curator-tasks-utils-TaskPerfUtils) | Utilities for aggregating stage performance metrics from tasks. | ### API ```python class nemo_curator.tasks.utils.TaskPerfUtils() ``` Utilities for aggregating stage performance metrics from tasks. Example output format: \{ "StageA": \{"process\_time": np.array(\[...]), "actor\_idle\_time": np.array(\[...]), "read\_time\_s": np.array(\[...]), ...}, "StageB": \{"process\_time": np.array(\[...]), ...} } ```python nemo_curator.tasks.utils.TaskPerfUtils._normalize_pipeline_tasks( tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None ) -> dict[str, list[nemo_curator.tasks.tasks.Task]] ``` staticmethod Return a mapping of pipeline name -> list of tasks from various input shapes. ```python nemo_curator.tasks.utils.TaskPerfUtils.aggregate_task_metrics( tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None, prefix: str | None = None ) -> dict[str, typing.Any] ``` staticmethod Aggregate task metrics by computing mean/std/sum. ```python nemo_curator.tasks.utils.TaskPerfUtils.collect_stage_metrics( tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None ) -> dict[str, dict[str, numpy.ndarray[float]]] ``` staticmethod Collect per-stage metric lists from tasks or workflow outputs. The returned mapping aggregates both built-in StagePerfStats metrics and any custom\_stats recorded by stages. **Parameters:** Iterable of tasks, a workflow result dictionary, or WorkflowRunResult. **Returns:** `dict[str, dict[str, np.ndarray[float]]]` Dict mapping stage\_name -> metric\_name -> list of numeric values. ```python nemo_curator.tasks.utils.TaskPerfUtils.get_aggregated_stage_stat( tasks: list[nemo_curator.tasks.tasks.Task] | nemo_curator.pipeline.workflow.WorkflowRunResult | collections.abc.Mapping[str, list[nemo_curator.tasks.tasks.Task]] | None, stage_prefix: str, stat: str ) -> float ``` staticmethod Get an aggregated stat for stages matching a name prefix. Sums the performance statistics from all stages whose names start with the given prefix across all tasks. **Parameters:** A list of Task objects, a WorkflowRunResult, or a mapping of pipeline\_name -> list\[Task] Match stages whose name starts with this prefix. The stat to extract (e.g., "num\_items\_processed", "process\_time"). **Returns:** `float` The aggregated stat value, or 0.0 if no matches found.