# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import importlib
import json
import os
import signal
import sys
import time
from typing import Optional
import psutil
import yaml
from nemo_evaluator.adapters.server import AdapterServerProcess
from nemo_evaluator.api.api_dataclasses import (
Evaluation,
EvaluationConfig,
EvaluationMetadata,
EvaluationResult,
EvaluationTarget,
)
from nemo_evaluator.core.input import prepare_output_directory, validate_configuration
from nemo_evaluator.core.resources import (
aggregate_runtime_metrics,
monitor_memory_usage,
)
from nemo_evaluator.core.utils import run_command
from nemo_evaluator.logging import get_logger
from nemo_evaluator.package_info import __version__
logger = get_logger(__name__)
__all__ = ["evaluate"]
def parse_output(evaluation: Evaluation) -> EvaluationResult:
"""Parse evaluation output by importing the appropriate output module.
Uses Python namespace package mechanism for both nemo_evaluator and core_evals.
Tries nemo_evaluator namespace first, then falls back to core_evals.
"""
# Try nemo_evaluator namespace first (custom harnesses from .nemo_evaluator directories)
try:
output_module = importlib.import_module(
f"nemo_evaluator.{evaluation.pkg_name}.output"
)
logger.debug(
f"Loaded output parser from nemo_evaluator.{evaluation.pkg_name}.output"
)
return output_module.parse_output(evaluation.config.output_dir)
except (ImportError, ModuleNotFoundError):
logger.debug(
f"Module nemo_evaluator.{evaluation.pkg_name}.output not found, trying core_evals"
)
# Fall back to core_evals namespace
try:
output_module = importlib.import_module(
f"core_evals.{evaluation.pkg_name}.output"
)
logger.debug(
f"Loaded output parser from core_evals.{evaluation.pkg_name}.output"
)
return output_module.parse_output(evaluation.config.output_dir)
except (ImportError, ModuleNotFoundError) as e:
logger.error(
f"Failed to import output parser for {evaluation.pkg_name}. "
f"Tried both nemo_evaluator.{evaluation.pkg_name}.output and core_evals.{evaluation.pkg_name}.output"
)
raise ImportError(
f"Could not find output parser for harness '{evaluation.pkg_name}'. "
f"Make sure the harness is in nemo_evaluator or core_evals namespace."
) from e
def _run_evaluation(
evaluation: Evaluation,
target_cfg: EvaluationTarget,
metadata: Optional[EvaluationMetadata],
) -> EvaluationResult:
"""Core evaluation logic.
Args:
evaluation: Validated evaluation object.
target_cfg: Target configuration.
metadata: Optional evaluation metadata.
Returns:
EvaluationResult: Evaluation results and metadata.
"""
metadata_block = _persist_metadata_and_build_results_block(
evaluation.config.output_dir, metadata
)
# Check if adapter is in client mode (no server needed)
use_client_mode = (
target_cfg.api_endpoint
and target_cfg.api_endpoint.adapter_config
and target_cfg.api_endpoint.adapter_config.mode == "client"
)
# Track if graceful cleanup has been performed
cleanup_performed = False
def run_client_mode_cleanup():
"""Run post-eval hooks for client mode during graceful shutdown."""
nonlocal cleanup_performed
if cleanup_performed:
return
cleanup_performed = True
if (
use_client_mode
and target_cfg.api_endpoint
and target_cfg.api_endpoint.adapter_config
):
try:
from nemo_evaluator.adapters.pipeline import AdapterPipeline
pipeline = AdapterPipeline(
target_cfg.api_endpoint.adapter_config,
evaluation.config.output_dir,
target_cfg.api_endpoint.model_id,
)
pipeline.run_post_eval_hooks(url=target_cfg.api_endpoint.url or "")
logger.info("Post-eval hooks executed during shutdown")
except Exception as e:
logger.error(f"Failed to run post-eval hooks during shutdown: {e}")
def kill_all(signum=None, frame=None):
"""Kill all processes and exit."""
logger.critical("FATAL: Terminating all processes...")
# For SIGTERM (graceful shutdown), run client mode cleanup first
if signum == signal.SIGTERM:
run_client_mode_cleanup()
parent = psutil.Process(os.getpid()) # current process
children = parent.children(recursive=True)
for child in children:
if signum == signal.SIGINT:
# Send SIGINT to children for immediate termination (skip post-eval hooks)
child.send_signal(signal.SIGINT)
else:
# Send SIGTERM to children for graceful termination (run post-eval hooks)
child.terminate()
# Use faster timeout for keyboard interrupt (SIGINT)
timeout = 1 if signum == signal.SIGINT else 5
gone, alive = psutil.wait_procs(children, timeout=timeout)
for child in alive:
logger.warning(f"Force killing child process {child.pid}")
child.kill()
sys.exit(1)
# Set up signal handlers
signal.signal(signal.SIGTERM, kill_all)
signal.signal(signal.SIGINT, kill_all)
def run_evaluation_core():
# NOTE: if we use NeMoEvaluatorClient on the benchmark side, there's no need to
# run adapter server for evaluation and we can use client model here
if use_client_mode:
logger.info("Using client mode - skipping adapter server")
cmd = evaluation.render_command()
run_command(cmd, verbose=True, propagate_errors=True)
evaluation_result = parse_output(evaluation)
# In client mode, explicitly run post-eval hooks after command completes
# This ensures hooks run reliably from the parent process rather than
# depending on finalizers in the subprocess during interpreter shutdown
# Note: cleanup_performed flag prevents double execution if SIGTERM was received
if not cleanup_performed:
run_client_mode_cleanup()
return evaluation_result
else:
with AdapterServerProcess(evaluation):
cmd = evaluation.render_command()
run_command(cmd, verbose=True, propagate_errors=True)
evaluation_result = parse_output(evaluation)
return evaluation_result
# Get cache directory from caching interceptor configuration
cache_dir = None
if (
target_cfg.api_endpoint
and target_cfg.api_endpoint.adapter_config
and target_cfg.api_endpoint.adapter_config.interceptors
):
for interceptor in target_cfg.api_endpoint.adapter_config.interceptors:
if (
interceptor.name == "caching"
and interceptor.enabled
and interceptor.config
and interceptor.config.get("cache_dir")
):
cache_dir = interceptor.config["cache_dir"]
logger.info(f"Using caching interceptor cache_dir: {cache_dir}")
break
if not cache_dir:
logger.info("No cache directory configured, token usage will not be collected")
evaluation_result, metrics = monitor_memory_usage(
run_evaluation_core,
interval_ms=100,
cache_dir=cache_dir,
output_dir=evaluation.config.output_dir,
)
metrics_path = os.path.join(
evaluation.config.output_dir, "eval_factory_metrics.json"
)
# Read existing metrics if file exists
existing_metrics = {}
if os.path.exists(metrics_path):
try:
with open(metrics_path, "r") as f:
existing_metrics = json.load(f)
except (json.JSONDecodeError, IOError):
pass # Start fresh if file is corrupted
# Aggregate all run data from run_times directory
aggregated_metrics = aggregate_runtime_metrics(evaluation.config.output_dir)
if aggregated_metrics:
runtime = aggregated_metrics.get("runtime_seconds", 0)
inference_time = aggregated_metrics.get("inference_time_seconds", 0)
scoring_time = aggregated_metrics.get("scoring_time_seconds", 0)
logger.info(
"Aggregated metrics",
runtime_seconds=runtime,
inference_time_seconds=inference_time,
scoring_time_seconds=scoring_time,
peak_memory_bytes=aggregated_metrics.get("peak_memory_bytes", 0),
total_runs=aggregated_metrics.get("total_runs", 0),
)
# Use aggregated metrics if available, otherwise use current metrics
final_metrics = aggregated_metrics if aggregated_metrics else metrics
# Merge with existing metrics, using "evaluation" as the key
# If evaluation key already exists, merge the metrics instead of overwriting
if "evaluation" in existing_metrics:
# Aggregate existing evaluation metrics with new ones
existing_eval = existing_metrics["evaluation"]
if isinstance(existing_eval, dict) and isinstance(final_metrics, dict):
# Merge dictionaries with appropriate aggregation strategy
merged_eval = existing_eval.copy()
for key, value in final_metrics.items():
if (
key in merged_eval
and isinstance(merged_eval[key], (int, float))
and isinstance(value, (int, float))
):
if key in ["runtime_seconds"]:
merged_eval[key] += value
elif key in ["peak_memory_bytes", "peak_tree_memory_bytes"]:
merged_eval[key] = max(merged_eval[key], value)
else:
merged_eval[key] += value
elif key == "end_time":
merged_eval[key] = value
elif key == "start_time":
merged_eval[key] = value
else:
merged_eval[key] = value
merged_metrics = {**existing_metrics, "evaluation": merged_eval}
else:
merged_metrics = {**existing_metrics, "evaluation": final_metrics}
else:
merged_metrics = {**existing_metrics, "evaluation": final_metrics}
# Write merged metrics to file
with open(metrics_path, "w") as f:
json.dump(merged_metrics, f, indent=2)
evaluation_result_dict = {
"git_hash": os.getenv("CORE_EVALS_GIT_HASH"),
"command": evaluation.render_command(),
"config": evaluation.config.model_dump(exclude_none=True),
"target": evaluation.target.model_dump(exclude_none=True),
"results": evaluation_result.model_dump(exclude_none=True),
**metadata_block,
}
logger.info(yaml.dump(evaluation_result_dict))
with open(os.path.join(evaluation.config.output_dir, "results.yml"), "w") as f:
yaml.dump(evaluation_result_dict, f)
return evaluation_result
[docs]
def evaluate(
eval_cfg: EvaluationConfig,
target_cfg: EvaluationTarget,
metadata: Optional[EvaluationMetadata] = None,
) -> EvaluationResult:
"""
Run an evaluation using configuration objects.
Args:
eval_cfg: Evaluation configuration object containing output directory,
parameters, and evaluation type
target_cfg: Target configuration object containing API endpoint details
and adapter configuration
Returns:
EvaluationResult: Evaluation results and metadata
"""
from nemo_evaluator.config import TelemetryLevel
from nemo_evaluator.telemetry import (
EvaluationTaskEvent,
StatusEnum,
TelemetryHandler,
get_session_id,
get_telemetry_level,
)
start_time = time.time()
telemetry_handler = None
telemetry_level = get_telemetry_level()
if telemetry_level != TelemetryLevel.OFF:
telemetry_handler = TelemetryHandler(
source_client_version=__version__,
session_id=get_session_id(),
telemetry_level=telemetry_level,
)
telemetry_handler.start()
run_config = {
"config": eval_cfg.model_dump(),
"target": target_cfg.model_dump(),
}
evaluation = validate_configuration(run_config)
prepare_output_directory(evaluation)
model_name = (
target_cfg.api_endpoint.model_id
if target_cfg.api_endpoint and target_cfg.api_endpoint.model_id
else "unknown"
)
model_name_for_telemetry = (
model_name if telemetry_level == TelemetryLevel.DEFAULT else "redacted"
)
if telemetry_handler:
telemetry_handler.enqueue(
EvaluationTaskEvent(
task=eval_cfg.type,
framework_name=evaluation.framework_name,
model=model_name_for_telemetry,
status=StatusEnum.STARTED,
)
)
status = StatusEnum.FAILURE
try:
result = _run_evaluation(evaluation, target_cfg, metadata)
status = StatusEnum.SUCCESS
return result
finally:
if telemetry_handler:
telemetry_handler.enqueue(
EvaluationTaskEvent(
task=eval_cfg.type,
framework_name=evaluation.framework_name,
model=model_name_for_telemetry,
execution_duration_seconds=time.time() - start_time,
status=status,
)
)
telemetry_handler.stop()
def _write_with_versioning_header(
out_dir: str,
filename: str,
payload: dict,
versioning: dict,
):
header = (
"# Generated by nemo-evaluator invocation with the versions of components:\n"
)
versioning_yaml = yaml.safe_dump(versioning, sort_keys=False).rstrip("\n")
header += "\n".join(f"# {line}" for line in versioning_yaml.splitlines()) + "\n"
path = os.path.join(out_dir, filename)
with open(path, "w") as f:
f.write(header)
yaml.safe_dump(payload, f, sort_keys=False)
def _persist_metadata_and_build_results_block(
out_dir: str, md: Optional[EvaluationMetadata]
) -> dict:
"""Persist the entire metadata object and return a results.yml block.
Writes the full metadata payload to `metadata.yaml` with a versioning header.
Returns "metadata.verioning" block which later will be included into results.
"""
if not md:
return {}
md_modified = copy.deepcopy(md)
# Build versioning block first so we can include it as a commented header
# in the persisted metadata file for better provenance/debuggability.
updated_versioning: dict = dict(md_modified.get("versioning", {}))
git_hash = os.getenv("CORE_EVALS_GIT_HASH")
if git_hash:
updated_versioning["git-hash"] = git_hash
# TODO(agronskiy): we cannot import top level because due to alphabetic auto-sorting in
# nemo_evaluator.__init__ this leads to circular imports. The said autosorting cannot
# yet be ignored per-import since this functionality is not supported yet in
# ruff.
from nemo_evaluator import __version__ as nemo_evaluator_version
updated_versioning["nemo_evaluator"] = nemo_evaluator_version
# Construct full metadata payload to persist and return, augmenting versioning
# with inferred fields (git-hash, nemo_evaluator_version).
md_modified["versioning"] = updated_versioning
with open(os.path.join(out_dir, "metadata.yaml"), "w") as f:
yaml.safe_dump(md_modified, f, sort_keys=False)
# For the results.yaml block we only return versioning to keep uncluttered
return {
"metadata": {
"versioning": updated_versioning,
"__skipped_fields": "see metadata.yaml for the rest of the fields",
}
}