Source code for nemo_evaluator.core.evaluate

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import importlib
import json
import math
import os
import signal
import sys
import time
from typing import Optional

import psutil
import yaml

from nemo_evaluator.adapters.server import AdapterServerProcess
from nemo_evaluator.api.api_dataclasses import (
    Evaluation,
    EvaluationConfig,
    EvaluationMetadata,
    EvaluationResult,
    EvaluationTarget,
)
from nemo_evaluator.core.input import (
    prepare_output_directory,
    validate_configuration,
    verify_capabilities,
)
from nemo_evaluator.core.resources import (
    aggregate_runtime_metrics,
    monitor_memory_usage,
)
from nemo_evaluator.core.utils import run_command
from nemo_evaluator.logging import get_logger
from nemo_evaluator.package_info import __version__

logger = get_logger(__name__)

__all__ = ["evaluate", "INTERRUPTED_MARKER_FILENAME"]

SHUTDOWN_TIMEOUT_ENV_VAR = "NEMO_EVALUATOR_SHUTDOWN_TIMEOUT_SECONDS"
DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 30.0
INTERRUPTED_MARKER_FILENAME = ".nemo_evaluator_interrupted"
SERVER_ERROR_MARKER_FILENAME = ".nemo_evaluator_server_error"


def _get_interrupted_marker_path(output_dir: str) -> str:
    return os.path.join(output_dir, INTERRUPTED_MARKER_FILENAME)


def _clear_marker(output_dir: str, filename: str) -> None:
    marker_path = os.path.join(output_dir, filename)
    try:
        os.remove(marker_path)
    except FileNotFoundError:
        return
    except OSError as e:
        logger.warning(f"Failed to remove marker {marker_path}: {e}")


def _clear_interrupted_marker(output_dir: str) -> None:
    _clear_marker(output_dir, INTERRUPTED_MARKER_FILENAME)


def _write_interrupted_marker(output_dir: str, signum: Optional[int] = None) -> None:
    marker_path = _get_interrupted_marker_path(output_dir)
    signal_name = None
    if signum is not None:
        try:
            signal_name = signal.Signals(signum).name
        except ValueError:
            signal_name = str(signum)

    payload = {
        "signal": signal_name,
        "timestamp": time.time(),
    }
    try:
        with open(marker_path, "w") as f:
            json.dump(payload, f)
    except OSError as e:
        logger.warning(f"Failed to write interruption marker {marker_path}: {e}")


def _get_shutdown_timeout_seconds() -> float:
    """Return the graceful shutdown timeout for child processes."""
    raw_timeout = os.getenv(SHUTDOWN_TIMEOUT_ENV_VAR)
    if raw_timeout is None:
        return DEFAULT_SHUTDOWN_TIMEOUT_SECONDS

    try:
        timeout = float(raw_timeout)
    except ValueError:
        logger.warning(
            f"Invalid {SHUTDOWN_TIMEOUT_ENV_VAR}={raw_timeout!r}; "
            f"falling back to {DEFAULT_SHUTDOWN_TIMEOUT_SECONDS} seconds"
        )
        return DEFAULT_SHUTDOWN_TIMEOUT_SECONDS

    if not math.isfinite(timeout) or timeout <= 0:
        logger.warning(
            f"{SHUTDOWN_TIMEOUT_ENV_VAR}={raw_timeout!r} is not a positive finite number; "
            f"falling back to {DEFAULT_SHUTDOWN_TIMEOUT_SECONDS} seconds"
        )
        return DEFAULT_SHUTDOWN_TIMEOUT_SECONDS

    return timeout


def parse_output(evaluation: Evaluation) -> EvaluationResult:
    """Parse evaluation output by importing the appropriate output module.

    Uses Python namespace package mechanism for both nemo_evaluator and core_evals.
    Tries nemo_evaluator namespace first, then falls back to core_evals.
    """
    # Try nemo_evaluator namespace first (custom harnesses from .nemo_evaluator directories)
    try:
        output_module = importlib.import_module(
            f"nemo_evaluator.{evaluation.pkg_name}.output"
        )
        logger.debug(
            f"Loaded output parser from nemo_evaluator.{evaluation.pkg_name}.output"
        )
        return output_module.parse_output(evaluation.config.output_dir)
    except (ImportError, ModuleNotFoundError):
        logger.debug(
            f"Module nemo_evaluator.{evaluation.pkg_name}.output not found, trying core_evals"
        )

    # Fall back to core_evals namespace
    try:
        output_module = importlib.import_module(
            f"core_evals.{evaluation.pkg_name}.output"
        )
        logger.debug(
            f"Loaded output parser from core_evals.{evaluation.pkg_name}.output"
        )
        return output_module.parse_output(evaluation.config.output_dir)
    except (ImportError, ModuleNotFoundError) as e:
        logger.error(
            f"Failed to import output parser for {evaluation.pkg_name}. "
            f"Tried both nemo_evaluator.{evaluation.pkg_name}.output and core_evals.{evaluation.pkg_name}.output"
        )
        raise ImportError(
            f"Could not find output parser for harness '{evaluation.pkg_name}'. "
            f"Make sure the harness is in nemo_evaluator or core_evals namespace."
        ) from e


def _run_evaluation(
    evaluation: Evaluation,
    target_cfg: EvaluationTarget,
    metadata: Optional[EvaluationMetadata],
) -> EvaluationResult:
    """Core evaluation logic.

    Args:
        evaluation: Validated evaluation object.
        target_cfg: Target configuration.
        metadata: Optional evaluation metadata.

    Returns:
        EvaluationResult: Evaluation results and metadata.
    """
    metadata_block = _persist_metadata_and_build_results_block(
        evaluation.config.output_dir, metadata
    )

    # Check if adapter is in client mode (no server needed)
    use_client_mode = (
        target_cfg.api_endpoint
        and target_cfg.api_endpoint.adapter_config
        and target_cfg.api_endpoint.adapter_config.mode == "client"
    )

    # Import eagerly so it is available inside the signal handler without
    # risking a deadlock on the import lock.
    from nemo_evaluator.adapters.pipeline import AdapterPipeline

    # Cache timeout so the signal handler avoids redundant env-var parsing.
    shutdown_timeout = _get_shutdown_timeout_seconds()

    # Track if graceful cleanup has been performed
    cleanup_status: Optional[bool] = None

    def run_client_mode_cleanup() -> bool:
        """Run post-eval hooks for client mode during graceful shutdown."""
        nonlocal cleanup_status
        if cleanup_status is not None:
            return cleanup_status

        cleanup_status = True
        if not (
            use_client_mode
            and target_cfg.api_endpoint
            and target_cfg.api_endpoint.adapter_config
        ):
            return cleanup_status

        try:
            pipeline = AdapterPipeline(
                target_cfg.api_endpoint.adapter_config,
                evaluation.config.output_dir,
                target_cfg.api_endpoint.model_id,
            )
            pipeline.run_post_eval_hooks(url=target_cfg.api_endpoint.url or "")
            logger.info("Post-eval hooks executed during shutdown")
        except Exception as e:
            cleanup_status = False
            logger.error(f"Failed to run post-eval hooks during shutdown: {e}")

        return cleanup_status

    def kill_all(signum=None, frame=None):
        """Kill all processes and exit."""
        # If another SIGINT/SIGTERM arrives during shutdown, use default handling
        # instead of re-entering kill_all and repeating cleanup/termination logic.
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        signal.signal(signal.SIGINT, signal.SIG_DFL)

        logger.info("Terminating all processes...")

        server_error_shutdown = False
        cleanup_succeeded = True
        if signum == signal.SIGTERM:
            server_error_path = os.path.join(
                evaluation.config.output_dir, SERVER_ERROR_MARKER_FILENAME
            )
            server_error_shutdown = os.path.exists(server_error_path)
            if not server_error_shutdown:
                _write_interrupted_marker(evaluation.config.output_dir, signum)
        if signum != signal.SIGINT:
            cleanup_succeeded = run_client_mode_cleanup()

        parent = psutil.Process(os.getpid())  # current process
        children = parent.children(recursive=True)
        for child in children:
            if signum == signal.SIGINT:
                # Send SIGINT to children for immediate termination (skip post-eval hooks)
                child.send_signal(signal.SIGINT)
            else:
                # Send SIGTERM to children for graceful termination (run post-eval hooks)
                child.terminate()

        timeout = 1 if signum == signal.SIGINT else shutdown_timeout
        _, alive = psutil.wait_procs(children, timeout=timeout)
        for child in alive:
            logger.warning(
                f"Force killing child process {child.pid} after waiting {timeout} seconds"
            )
            child.kill()

        if signum == signal.SIGINT:
            exit_code = 128 + signal.SIGINT
        else:
            exit_code = (
                1
                if server_error_shutdown
                else 0
                if cleanup_succeeded and not alive
                else 1
            )
        sys.exit(exit_code)

    # Set up signal handlers
    signal.signal(signal.SIGTERM, kill_all)
    signal.signal(signal.SIGINT, kill_all)

    def run_evaluation_core():
        # NOTE: if we use NeMoEvaluatorClient on the benchmark side, there's no need to
        # run adapter server for evaluation and we can use client model here
        if use_client_mode:
            logger.info("Using client mode - skipping adapter server")
            cmd = evaluation.render_command()
            run_command(cmd, verbose=True, propagate_errors=True)
            evaluation_result = parse_output(evaluation)

            # In client mode, explicitly run post-eval hooks after command completes
            # This ensures hooks run reliably from the parent process rather than
            # depending on finalizers in the subprocess during interpreter shutdown
            # Note: cleanup_status prevents double execution if a signal was received
            if cleanup_status is None:
                run_client_mode_cleanup()

            return evaluation_result
        else:
            with AdapterServerProcess(evaluation):
                cmd = evaluation.render_command()
                run_command(cmd, verbose=True, propagate_errors=True)
                evaluation_result = parse_output(evaluation)
                return evaluation_result

    # Get cache directory from caching interceptor configuration
    cache_dir = None
    if (
        target_cfg.api_endpoint
        and target_cfg.api_endpoint.adapter_config
        and target_cfg.api_endpoint.adapter_config.interceptors
    ):
        for interceptor in target_cfg.api_endpoint.adapter_config.interceptors:
            if (
                interceptor.name == "caching"
                and interceptor.enabled
                and interceptor.config
                and interceptor.config.get("cache_dir")
            ):
                cache_dir = interceptor.config["cache_dir"]
                logger.info(f"Using caching interceptor cache_dir: {cache_dir}")
                break

    if not cache_dir:
        logger.info("No cache directory configured, token usage will not be collected")

    evaluation_result, metrics = monitor_memory_usage(
        run_evaluation_core,
        interval_ms=100,
        cache_dir=cache_dir,
        output_dir=evaluation.config.output_dir,
    )

    metrics_path = os.path.join(
        evaluation.config.output_dir, "eval_factory_metrics.json"
    )

    # Read existing metrics if file exists
    existing_metrics = {}
    if os.path.exists(metrics_path):
        try:
            with open(metrics_path, "r") as f:
                existing_metrics = json.load(f)
        except (json.JSONDecodeError, IOError):
            pass  # Start fresh if file is corrupted

    # Aggregate all run data from run_times directory
    aggregated_metrics = aggregate_runtime_metrics(evaluation.config.output_dir)

    if aggregated_metrics:
        runtime = aggregated_metrics.get("runtime_seconds", 0)
        inference_time = aggregated_metrics.get("inference_time_seconds", 0)
        scoring_time = aggregated_metrics.get("scoring_time_seconds", 0)
        logger.info(
            "Aggregated metrics",
            runtime_seconds=runtime,
            inference_time_seconds=inference_time,
            scoring_time_seconds=scoring_time,
            peak_memory_bytes=aggregated_metrics.get("peak_memory_bytes", 0),
            total_runs=aggregated_metrics.get("total_runs", 0),
        )

    # Use aggregated metrics if available, otherwise use current metrics
    final_metrics = aggregated_metrics if aggregated_metrics else metrics

    # Merge with existing metrics, using "evaluation" as the key
    # If evaluation key already exists, merge the metrics instead of overwriting
    if "evaluation" in existing_metrics:
        # Aggregate existing evaluation metrics with new ones
        existing_eval = existing_metrics["evaluation"]
        if isinstance(existing_eval, dict) and isinstance(final_metrics, dict):
            # Merge dictionaries with appropriate aggregation strategy
            merged_eval = existing_eval.copy()
            for key, value in final_metrics.items():
                if (
                    key in merged_eval
                    and isinstance(merged_eval[key], (int, float))
                    and isinstance(value, (int, float))
                ):
                    if key in ["runtime_seconds"]:
                        merged_eval[key] += value
                    elif key in ["peak_memory_bytes", "peak_tree_memory_bytes"]:
                        merged_eval[key] = max(merged_eval[key], value)
                    else:
                        merged_eval[key] += value
                elif key == "end_time":
                    merged_eval[key] = value
                elif key == "start_time":
                    merged_eval[key] = value
                else:
                    merged_eval[key] = value
            merged_metrics = {**existing_metrics, "evaluation": merged_eval}
        else:
            merged_metrics = {**existing_metrics, "evaluation": final_metrics}
    else:
        merged_metrics = {**existing_metrics, "evaluation": final_metrics}

    # Write merged metrics to file
    with open(metrics_path, "w") as f:
        json.dump(merged_metrics, f, indent=2)

    evaluation_result_dict = {
        "git_hash": os.getenv("CORE_EVALS_GIT_HASH"),
        "command": evaluation.render_command(),
        "config": evaluation.config.model_dump(exclude_none=True),
        "target": evaluation.target.model_dump(exclude_none=True),
        "results": evaluation_result.model_dump(exclude_none=True),
        **metadata_block,
    }

    logger.info(yaml.dump(evaluation_result_dict))

    with open(os.path.join(evaluation.config.output_dir, "results.yml"), "w") as f:
        yaml.dump(evaluation_result_dict, f)

    return evaluation_result


[docs] def evaluate( eval_cfg: EvaluationConfig, target_cfg: EvaluationTarget, metadata: Optional[EvaluationMetadata] = None, ) -> EvaluationResult: """ Run an evaluation using configuration objects. Args: eval_cfg: Evaluation configuration object containing output directory, parameters, and evaluation type target_cfg: Target configuration object containing API endpoint details and adapter configuration Returns: EvaluationResult: Evaluation results and metadata """ from nemo_evaluator.config import TelemetryLevel from nemo_evaluator.telemetry import ( EvaluationTaskEvent, StatusEnum, TelemetryHandler, get_session_id, get_telemetry_level, ) start_time = time.time() telemetry_handler = None telemetry_level = get_telemetry_level() if telemetry_level != TelemetryLevel.OFF: telemetry_handler = TelemetryHandler( source_client_version=__version__, session_id=get_session_id(), telemetry_level=telemetry_level, ) telemetry_handler.start() run_config = { "config": eval_cfg.model_dump(), "target": target_cfg.model_dump(), } evaluation = validate_configuration(run_config) # NOTE(martas): verify_capabilities returns False if any of the checks failed # but we don't use it in any way here verify_capabilities(evaluation) prepare_output_directory(evaluation) _clear_interrupted_marker(evaluation.config.output_dir) _clear_marker(evaluation.config.output_dir, SERVER_ERROR_MARKER_FILENAME) model_name = ( target_cfg.api_endpoint.model_id if target_cfg.api_endpoint and target_cfg.api_endpoint.model_id else "unknown" ) model_name_for_telemetry = ( model_name if telemetry_level == TelemetryLevel.DEFAULT else "redacted" ) if telemetry_handler: telemetry_handler.enqueue( EvaluationTaskEvent( task=eval_cfg.type, framework_name=evaluation.framework_name, model=model_name_for_telemetry, status=StatusEnum.STARTED, ) ) status = StatusEnum.FAILURE try: result = _run_evaluation(evaluation, target_cfg, metadata) status = StatusEnum.SUCCESS return result finally: if telemetry_handler: telemetry_handler.enqueue( EvaluationTaskEvent( task=eval_cfg.type, framework_name=evaluation.framework_name, model=model_name_for_telemetry, execution_duration_seconds=time.time() - start_time, status=status, ) ) telemetry_handler.stop()
def _write_with_versioning_header( out_dir: str, filename: str, payload: dict, versioning: dict, ): header = ( "# Generated by nemo-evaluator invocation with the versions of components:\n" ) versioning_yaml = yaml.safe_dump(versioning, sort_keys=False).rstrip("\n") header += "\n".join(f"# {line}" for line in versioning_yaml.splitlines()) + "\n" path = os.path.join(out_dir, filename) with open(path, "w") as f: f.write(header) yaml.safe_dump(payload, f, sort_keys=False) def _persist_metadata_and_build_results_block( out_dir: str, md: Optional[EvaluationMetadata] ) -> dict: """Persist the entire metadata object and return a results.yml block. Writes the full metadata payload to `metadata.yaml` with a versioning header. Returns "metadata.verioning" block which later will be included into results. """ if not md: return {} md_modified = copy.deepcopy(md) # Build versioning block first so we can include it as a commented header # in the persisted metadata file for better provenance/debuggability. updated_versioning: dict = dict(md_modified.get("versioning", {})) git_hash = os.getenv("CORE_EVALS_GIT_HASH") if git_hash: updated_versioning["git-hash"] = git_hash # TODO(agronskiy): we cannot import top level because due to alphabetic auto-sorting in # nemo_evaluator.__init__ this leads to circular imports. The said autosorting cannot # yet be ignored per-import since this functionality is not supported yet in # ruff. from nemo_evaluator import __version__ as nemo_evaluator_version updated_versioning["nemo_evaluator"] = nemo_evaluator_version # Construct full metadata payload to persist and return, augmenting versioning # with inferred fields (git-hash, nemo_evaluator_version). md_modified["versioning"] = updated_versioning with open(os.path.join(out_dir, "metadata.yaml"), "w") as f: yaml.safe_dump(md_modified, f, sort_keys=False) # For the results.yaml block we only return versioning to keep uncluttered return { "metadata": { "versioning": updated_versioning, "__skipped_fields": "see metadata.yaml for the rest of the fields", } }