Source code for nemo_evaluator.core.evaluate

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import importlib
import json
import os
import signal
import sys
import time
from typing import Optional

import psutil
import yaml

from nemo_evaluator.adapters.server import AdapterServerProcess
from nemo_evaluator.api.api_dataclasses import (
    Evaluation,
    EvaluationConfig,
    EvaluationMetadata,
    EvaluationResult,
    EvaluationTarget,
)
from nemo_evaluator.core.input import prepare_output_directory, validate_configuration
from nemo_evaluator.core.resources import (
    aggregate_runtime_metrics,
    monitor_memory_usage,
)
from nemo_evaluator.core.utils import run_command
from nemo_evaluator.logging import get_logger
from nemo_evaluator.package_info import __version__

logger = get_logger(__name__)

__all__ = ["evaluate"]


def parse_output(evaluation: Evaluation) -> EvaluationResult:
    """Parse evaluation output by importing the appropriate output module.

    Uses Python namespace package mechanism for both nemo_evaluator and core_evals.
    Tries nemo_evaluator namespace first, then falls back to core_evals.
    """
    # Try nemo_evaluator namespace first (custom harnesses from .nemo_evaluator directories)
    try:
        output_module = importlib.import_module(
            f"nemo_evaluator.{evaluation.pkg_name}.output"
        )
        logger.debug(
            f"Loaded output parser from nemo_evaluator.{evaluation.pkg_name}.output"
        )
        return output_module.parse_output(evaluation.config.output_dir)
    except (ImportError, ModuleNotFoundError):
        logger.debug(
            f"Module nemo_evaluator.{evaluation.pkg_name}.output not found, trying core_evals"
        )

    # Fall back to core_evals namespace
    try:
        output_module = importlib.import_module(
            f"core_evals.{evaluation.pkg_name}.output"
        )
        logger.debug(
            f"Loaded output parser from core_evals.{evaluation.pkg_name}.output"
        )
        return output_module.parse_output(evaluation.config.output_dir)
    except (ImportError, ModuleNotFoundError) as e:
        logger.error(
            f"Failed to import output parser for {evaluation.pkg_name}. "
            f"Tried both nemo_evaluator.{evaluation.pkg_name}.output and core_evals.{evaluation.pkg_name}.output"
        )
        raise ImportError(
            f"Could not find output parser for harness '{evaluation.pkg_name}'. "
            f"Make sure the harness is in nemo_evaluator or core_evals namespace."
        ) from e


def _run_evaluation(
    evaluation: Evaluation,
    target_cfg: EvaluationTarget,
    metadata: Optional[EvaluationMetadata],
) -> EvaluationResult:
    """Core evaluation logic.

    Args:
        evaluation: Validated evaluation object.
        target_cfg: Target configuration.
        metadata: Optional evaluation metadata.

    Returns:
        EvaluationResult: Evaluation results and metadata.
    """
    metadata_block = _persist_metadata_and_build_results_block(
        evaluation.config.output_dir, metadata
    )

    # Check if adapter is in client mode (no server needed)
    use_client_mode = (
        target_cfg.api_endpoint
        and target_cfg.api_endpoint.adapter_config
        and target_cfg.api_endpoint.adapter_config.mode == "client"
    )

    # Track if graceful cleanup has been performed
    cleanup_performed = False

    def run_client_mode_cleanup():
        """Run post-eval hooks for client mode during graceful shutdown."""
        nonlocal cleanup_performed
        if cleanup_performed:
            return

        cleanup_performed = True
        if (
            use_client_mode
            and target_cfg.api_endpoint
            and target_cfg.api_endpoint.adapter_config
        ):
            try:
                from nemo_evaluator.adapters.pipeline import AdapterPipeline

                pipeline = AdapterPipeline(
                    target_cfg.api_endpoint.adapter_config,
                    evaluation.config.output_dir,
                    target_cfg.api_endpoint.model_id,
                )
                pipeline.run_post_eval_hooks(url=target_cfg.api_endpoint.url or "")
                logger.info("Post-eval hooks executed during shutdown")
            except Exception as e:
                logger.error(f"Failed to run post-eval hooks during shutdown: {e}")

    def kill_all(signum=None, frame=None):
        """Kill all processes and exit."""
        logger.critical("FATAL: Terminating all processes...")

        # For SIGTERM (graceful shutdown), run client mode cleanup first
        if signum == signal.SIGTERM:
            run_client_mode_cleanup()

        parent = psutil.Process(os.getpid())  # current process
        children = parent.children(recursive=True)
        for child in children:
            if signum == signal.SIGINT:
                # Send SIGINT to children for immediate termination (skip post-eval hooks)
                child.send_signal(signal.SIGINT)
            else:
                # Send SIGTERM to children for graceful termination (run post-eval hooks)
                child.terminate()

        # Use faster timeout for keyboard interrupt (SIGINT)
        timeout = 1 if signum == signal.SIGINT else 5
        gone, alive = psutil.wait_procs(children, timeout=timeout)
        for child in alive:
            logger.warning(f"Force killing child process {child.pid}")
            child.kill()

        sys.exit(1)

    # Set up signal handlers
    signal.signal(signal.SIGTERM, kill_all)
    signal.signal(signal.SIGINT, kill_all)

    def run_evaluation_core():
        # NOTE: if we use NeMoEvaluatorClient on the benchmark side, there's no need to
        # run adapter server for evaluation and we can use client model here
        if use_client_mode:
            logger.info("Using client mode - skipping adapter server")
            cmd = evaluation.render_command()
            run_command(cmd, verbose=True, propagate_errors=True)
            evaluation_result = parse_output(evaluation)

            # In client mode, explicitly run post-eval hooks after command completes
            # This ensures hooks run reliably from the parent process rather than
            # depending on finalizers in the subprocess during interpreter shutdown
            # Note: cleanup_performed flag prevents double execution if SIGTERM was received
            if not cleanup_performed:
                run_client_mode_cleanup()

            return evaluation_result
        else:
            with AdapterServerProcess(evaluation):
                cmd = evaluation.render_command()
                run_command(cmd, verbose=True, propagate_errors=True)
                evaluation_result = parse_output(evaluation)
                return evaluation_result

    # Get cache directory from caching interceptor configuration
    cache_dir = None
    if (
        target_cfg.api_endpoint
        and target_cfg.api_endpoint.adapter_config
        and target_cfg.api_endpoint.adapter_config.interceptors
    ):
        for interceptor in target_cfg.api_endpoint.adapter_config.interceptors:
            if (
                interceptor.name == "caching"
                and interceptor.enabled
                and interceptor.config
                and interceptor.config.get("cache_dir")
            ):
                cache_dir = interceptor.config["cache_dir"]
                logger.info(f"Using caching interceptor cache_dir: {cache_dir}")
                break

    if not cache_dir:
        logger.info("No cache directory configured, token usage will not be collected")

    evaluation_result, metrics = monitor_memory_usage(
        run_evaluation_core,
        interval_ms=100,
        cache_dir=cache_dir,
        output_dir=evaluation.config.output_dir,
    )

    metrics_path = os.path.join(
        evaluation.config.output_dir, "eval_factory_metrics.json"
    )

    # Read existing metrics if file exists
    existing_metrics = {}
    if os.path.exists(metrics_path):
        try:
            with open(metrics_path, "r") as f:
                existing_metrics = json.load(f)
        except (json.JSONDecodeError, IOError):
            pass  # Start fresh if file is corrupted

    # Aggregate all run data from run_times directory
    aggregated_metrics = aggregate_runtime_metrics(evaluation.config.output_dir)

    if aggregated_metrics:
        runtime = aggregated_metrics.get("runtime_seconds", 0)
        inference_time = aggregated_metrics.get("inference_time_seconds", 0)
        scoring_time = aggregated_metrics.get("scoring_time_seconds", 0)
        logger.info(
            "Aggregated metrics",
            runtime_seconds=runtime,
            inference_time_seconds=inference_time,
            scoring_time_seconds=scoring_time,
            peak_memory_bytes=aggregated_metrics.get("peak_memory_bytes", 0),
            total_runs=aggregated_metrics.get("total_runs", 0),
        )

    # Use aggregated metrics if available, otherwise use current metrics
    final_metrics = aggregated_metrics if aggregated_metrics else metrics

    # Merge with existing metrics, using "evaluation" as the key
    # If evaluation key already exists, merge the metrics instead of overwriting
    if "evaluation" in existing_metrics:
        # Aggregate existing evaluation metrics with new ones
        existing_eval = existing_metrics["evaluation"]
        if isinstance(existing_eval, dict) and isinstance(final_metrics, dict):
            # Merge dictionaries with appropriate aggregation strategy
            merged_eval = existing_eval.copy()
            for key, value in final_metrics.items():
                if (
                    key in merged_eval
                    and isinstance(merged_eval[key], (int, float))
                    and isinstance(value, (int, float))
                ):
                    if key in ["runtime_seconds"]:
                        merged_eval[key] += value
                    elif key in ["peak_memory_bytes", "peak_tree_memory_bytes"]:
                        merged_eval[key] = max(merged_eval[key], value)
                    else:
                        merged_eval[key] += value
                elif key == "end_time":
                    merged_eval[key] = value
                elif key == "start_time":
                    merged_eval[key] = value
                else:
                    merged_eval[key] = value
            merged_metrics = {**existing_metrics, "evaluation": merged_eval}
        else:
            merged_metrics = {**existing_metrics, "evaluation": final_metrics}
    else:
        merged_metrics = {**existing_metrics, "evaluation": final_metrics}

    # Write merged metrics to file
    with open(metrics_path, "w") as f:
        json.dump(merged_metrics, f, indent=2)

    evaluation_result_dict = {
        "git_hash": os.getenv("CORE_EVALS_GIT_HASH"),
        "command": evaluation.render_command(),
        "config": evaluation.config.model_dump(exclude_none=True),
        "target": evaluation.target.model_dump(exclude_none=True),
        "results": evaluation_result.model_dump(exclude_none=True),
        **metadata_block,
    }

    logger.info(yaml.dump(evaluation_result_dict))

    with open(os.path.join(evaluation.config.output_dir, "results.yml"), "w") as f:
        yaml.dump(evaluation_result_dict, f)

    return evaluation_result


[docs] def evaluate( eval_cfg: EvaluationConfig, target_cfg: EvaluationTarget, metadata: Optional[EvaluationMetadata] = None, ) -> EvaluationResult: """ Run an evaluation using configuration objects. Args: eval_cfg: Evaluation configuration object containing output directory, parameters, and evaluation type target_cfg: Target configuration object containing API endpoint details and adapter configuration Returns: EvaluationResult: Evaluation results and metadata """ from nemo_evaluator.config import TelemetryLevel from nemo_evaluator.telemetry import ( EvaluationTaskEvent, StatusEnum, TelemetryHandler, get_session_id, get_telemetry_level, ) start_time = time.time() telemetry_handler = None telemetry_level = get_telemetry_level() if telemetry_level != TelemetryLevel.OFF: telemetry_handler = TelemetryHandler( source_client_version=__version__, session_id=get_session_id(), telemetry_level=telemetry_level, ) telemetry_handler.start() run_config = { "config": eval_cfg.model_dump(), "target": target_cfg.model_dump(), } evaluation = validate_configuration(run_config) prepare_output_directory(evaluation) model_name = ( target_cfg.api_endpoint.model_id if target_cfg.api_endpoint and target_cfg.api_endpoint.model_id else "unknown" ) model_name_for_telemetry = ( model_name if telemetry_level == TelemetryLevel.DEFAULT else "redacted" ) if telemetry_handler: telemetry_handler.enqueue( EvaluationTaskEvent( task=eval_cfg.type, framework_name=evaluation.framework_name, model=model_name_for_telemetry, status=StatusEnum.STARTED, ) ) status = StatusEnum.FAILURE try: result = _run_evaluation(evaluation, target_cfg, metadata) status = StatusEnum.SUCCESS return result finally: if telemetry_handler: telemetry_handler.enqueue( EvaluationTaskEvent( task=eval_cfg.type, framework_name=evaluation.framework_name, model=model_name_for_telemetry, execution_duration_seconds=time.time() - start_time, status=status, ) ) telemetry_handler.stop()
def _write_with_versioning_header( out_dir: str, filename: str, payload: dict, versioning: dict, ): header = ( "# Generated by nemo-evaluator invocation with the versions of components:\n" ) versioning_yaml = yaml.safe_dump(versioning, sort_keys=False).rstrip("\n") header += "\n".join(f"# {line}" for line in versioning_yaml.splitlines()) + "\n" path = os.path.join(out_dir, filename) with open(path, "w") as f: f.write(header) yaml.safe_dump(payload, f, sort_keys=False) def _persist_metadata_and_build_results_block( out_dir: str, md: Optional[EvaluationMetadata] ) -> dict: """Persist the entire metadata object and return a results.yml block. Writes the full metadata payload to `metadata.yaml` with a versioning header. Returns "metadata.verioning" block which later will be included into results. """ if not md: return {} md_modified = copy.deepcopy(md) # Build versioning block first so we can include it as a commented header # in the persisted metadata file for better provenance/debuggability. updated_versioning: dict = dict(md_modified.get("versioning", {})) git_hash = os.getenv("CORE_EVALS_GIT_HASH") if git_hash: updated_versioning["git-hash"] = git_hash # TODO(agronskiy): we cannot import top level because due to alphabetic auto-sorting in # nemo_evaluator.__init__ this leads to circular imports. The said autosorting cannot # yet be ignored per-import since this functionality is not supported yet in # ruff. from nemo_evaluator import __version__ as nemo_evaluator_version updated_versioning["nemo_evaluator"] = nemo_evaluator_version # Construct full metadata payload to persist and return, augmenting versioning # with inferred fields (git-hash, nemo_evaluator_version). md_modified["versioning"] = updated_versioning with open(os.path.join(out_dir, "metadata.yaml"), "w") as f: yaml.safe_dump(md_modified, f, sort_keys=False) # For the results.yaml block we only return versioning to keep uncluttered return { "metadata": { "versioning": updated_versioning, "__skipped_fields": "see metadata.yaml for the rest of the fields", } }