# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import importlib
import json
import math
import os
import signal
import sys
import time
from typing import Optional
import psutil
import yaml
from nemo_evaluator.adapters.server import AdapterServerProcess
from nemo_evaluator.api.api_dataclasses import (
Evaluation,
EvaluationConfig,
EvaluationMetadata,
EvaluationResult,
EvaluationTarget,
)
from nemo_evaluator.core.input import (
prepare_output_directory,
validate_configuration,
verify_capabilities,
)
from nemo_evaluator.core.resources import (
aggregate_runtime_metrics,
monitor_memory_usage,
)
from nemo_evaluator.core.utils import run_command
from nemo_evaluator.logging import get_logger
from nemo_evaluator.package_info import __version__
logger = get_logger(__name__)
__all__ = ["evaluate", "INTERRUPTED_MARKER_FILENAME"]
SHUTDOWN_TIMEOUT_ENV_VAR = "NEMO_EVALUATOR_SHUTDOWN_TIMEOUT_SECONDS"
DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 30.0
INTERRUPTED_MARKER_FILENAME = ".nemo_evaluator_interrupted"
SERVER_ERROR_MARKER_FILENAME = ".nemo_evaluator_server_error"
def _get_interrupted_marker_path(output_dir: str) -> str:
return os.path.join(output_dir, INTERRUPTED_MARKER_FILENAME)
def _clear_marker(output_dir: str, filename: str) -> None:
marker_path = os.path.join(output_dir, filename)
try:
os.remove(marker_path)
except FileNotFoundError:
return
except OSError as e:
logger.warning(f"Failed to remove marker {marker_path}: {e}")
def _clear_interrupted_marker(output_dir: str) -> None:
_clear_marker(output_dir, INTERRUPTED_MARKER_FILENAME)
def _write_interrupted_marker(output_dir: str, signum: Optional[int] = None) -> None:
marker_path = _get_interrupted_marker_path(output_dir)
signal_name = None
if signum is not None:
try:
signal_name = signal.Signals(signum).name
except ValueError:
signal_name = str(signum)
payload = {
"signal": signal_name,
"timestamp": time.time(),
}
try:
with open(marker_path, "w") as f:
json.dump(payload, f)
except OSError as e:
logger.warning(f"Failed to write interruption marker {marker_path}: {e}")
def _get_shutdown_timeout_seconds() -> float:
"""Return the graceful shutdown timeout for child processes."""
raw_timeout = os.getenv(SHUTDOWN_TIMEOUT_ENV_VAR)
if raw_timeout is None:
return DEFAULT_SHUTDOWN_TIMEOUT_SECONDS
try:
timeout = float(raw_timeout)
except ValueError:
logger.warning(
f"Invalid {SHUTDOWN_TIMEOUT_ENV_VAR}={raw_timeout!r}; "
f"falling back to {DEFAULT_SHUTDOWN_TIMEOUT_SECONDS} seconds"
)
return DEFAULT_SHUTDOWN_TIMEOUT_SECONDS
if not math.isfinite(timeout) or timeout <= 0:
logger.warning(
f"{SHUTDOWN_TIMEOUT_ENV_VAR}={raw_timeout!r} is not a positive finite number; "
f"falling back to {DEFAULT_SHUTDOWN_TIMEOUT_SECONDS} seconds"
)
return DEFAULT_SHUTDOWN_TIMEOUT_SECONDS
return timeout
def parse_output(evaluation: Evaluation) -> EvaluationResult:
"""Parse evaluation output by importing the appropriate output module.
Uses Python namespace package mechanism for both nemo_evaluator and core_evals.
Tries nemo_evaluator namespace first, then falls back to core_evals.
"""
# Try nemo_evaluator namespace first (custom harnesses from .nemo_evaluator directories)
try:
output_module = importlib.import_module(
f"nemo_evaluator.{evaluation.pkg_name}.output"
)
logger.debug(
f"Loaded output parser from nemo_evaluator.{evaluation.pkg_name}.output"
)
return output_module.parse_output(evaluation.config.output_dir)
except (ImportError, ModuleNotFoundError):
logger.debug(
f"Module nemo_evaluator.{evaluation.pkg_name}.output not found, trying core_evals"
)
# Fall back to core_evals namespace
try:
output_module = importlib.import_module(
f"core_evals.{evaluation.pkg_name}.output"
)
logger.debug(
f"Loaded output parser from core_evals.{evaluation.pkg_name}.output"
)
return output_module.parse_output(evaluation.config.output_dir)
except (ImportError, ModuleNotFoundError) as e:
logger.error(
f"Failed to import output parser for {evaluation.pkg_name}. "
f"Tried both nemo_evaluator.{evaluation.pkg_name}.output and core_evals.{evaluation.pkg_name}.output"
)
raise ImportError(
f"Could not find output parser for harness '{evaluation.pkg_name}'. "
f"Make sure the harness is in nemo_evaluator or core_evals namespace."
) from e
def _run_evaluation(
evaluation: Evaluation,
target_cfg: EvaluationTarget,
metadata: Optional[EvaluationMetadata],
) -> EvaluationResult:
"""Core evaluation logic.
Args:
evaluation: Validated evaluation object.
target_cfg: Target configuration.
metadata: Optional evaluation metadata.
Returns:
EvaluationResult: Evaluation results and metadata.
"""
metadata_block = _persist_metadata_and_build_results_block(
evaluation.config.output_dir, metadata
)
# Check if adapter is in client mode (no server needed)
use_client_mode = (
target_cfg.api_endpoint
and target_cfg.api_endpoint.adapter_config
and target_cfg.api_endpoint.adapter_config.mode == "client"
)
# Import eagerly so it is available inside the signal handler without
# risking a deadlock on the import lock.
from nemo_evaluator.adapters.pipeline import AdapterPipeline
# Cache timeout so the signal handler avoids redundant env-var parsing.
shutdown_timeout = _get_shutdown_timeout_seconds()
# Track if graceful cleanup has been performed
cleanup_status: Optional[bool] = None
def run_client_mode_cleanup() -> bool:
"""Run post-eval hooks for client mode during graceful shutdown."""
nonlocal cleanup_status
if cleanup_status is not None:
return cleanup_status
cleanup_status = True
if not (
use_client_mode
and target_cfg.api_endpoint
and target_cfg.api_endpoint.adapter_config
):
return cleanup_status
try:
pipeline = AdapterPipeline(
target_cfg.api_endpoint.adapter_config,
evaluation.config.output_dir,
target_cfg.api_endpoint.model_id,
)
pipeline.run_post_eval_hooks(url=target_cfg.api_endpoint.url or "")
logger.info("Post-eval hooks executed during shutdown")
except Exception as e:
cleanup_status = False
logger.error(f"Failed to run post-eval hooks during shutdown: {e}")
return cleanup_status
def kill_all(signum=None, frame=None):
"""Kill all processes and exit."""
# If another SIGINT/SIGTERM arrives during shutdown, use default handling
# instead of re-entering kill_all and repeating cleanup/termination logic.
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
logger.info("Terminating all processes...")
server_error_shutdown = False
cleanup_succeeded = True
if signum == signal.SIGTERM:
server_error_path = os.path.join(
evaluation.config.output_dir, SERVER_ERROR_MARKER_FILENAME
)
server_error_shutdown = os.path.exists(server_error_path)
if not server_error_shutdown:
_write_interrupted_marker(evaluation.config.output_dir, signum)
if signum != signal.SIGINT:
cleanup_succeeded = run_client_mode_cleanup()
parent = psutil.Process(os.getpid()) # current process
children = parent.children(recursive=True)
for child in children:
if signum == signal.SIGINT:
# Send SIGINT to children for immediate termination (skip post-eval hooks)
child.send_signal(signal.SIGINT)
else:
# Send SIGTERM to children for graceful termination (run post-eval hooks)
child.terminate()
timeout = 1 if signum == signal.SIGINT else shutdown_timeout
_, alive = psutil.wait_procs(children, timeout=timeout)
for child in alive:
logger.warning(
f"Force killing child process {child.pid} after waiting {timeout} seconds"
)
child.kill()
if signum == signal.SIGINT:
exit_code = 128 + signal.SIGINT
else:
exit_code = (
1
if server_error_shutdown
else 0
if cleanup_succeeded and not alive
else 1
)
sys.exit(exit_code)
# Set up signal handlers
signal.signal(signal.SIGTERM, kill_all)
signal.signal(signal.SIGINT, kill_all)
def run_evaluation_core():
# NOTE: if we use NeMoEvaluatorClient on the benchmark side, there's no need to
# run adapter server for evaluation and we can use client model here
if use_client_mode:
logger.info("Using client mode - skipping adapter server")
cmd = evaluation.render_command()
run_command(cmd, verbose=True, propagate_errors=True)
evaluation_result = parse_output(evaluation)
# In client mode, explicitly run post-eval hooks after command completes
# This ensures hooks run reliably from the parent process rather than
# depending on finalizers in the subprocess during interpreter shutdown
# Note: cleanup_status prevents double execution if a signal was received
if cleanup_status is None:
run_client_mode_cleanup()
return evaluation_result
else:
with AdapterServerProcess(evaluation):
cmd = evaluation.render_command()
run_command(cmd, verbose=True, propagate_errors=True)
evaluation_result = parse_output(evaluation)
return evaluation_result
# Get cache directory from caching interceptor configuration
cache_dir = None
if (
target_cfg.api_endpoint
and target_cfg.api_endpoint.adapter_config
and target_cfg.api_endpoint.adapter_config.interceptors
):
for interceptor in target_cfg.api_endpoint.adapter_config.interceptors:
if (
interceptor.name == "caching"
and interceptor.enabled
and interceptor.config
and interceptor.config.get("cache_dir")
):
cache_dir = interceptor.config["cache_dir"]
logger.info(f"Using caching interceptor cache_dir: {cache_dir}")
break
if not cache_dir:
logger.info("No cache directory configured, token usage will not be collected")
evaluation_result, metrics = monitor_memory_usage(
run_evaluation_core,
interval_ms=100,
cache_dir=cache_dir,
output_dir=evaluation.config.output_dir,
)
metrics_path = os.path.join(
evaluation.config.output_dir, "eval_factory_metrics.json"
)
# Read existing metrics if file exists
existing_metrics = {}
if os.path.exists(metrics_path):
try:
with open(metrics_path, "r") as f:
existing_metrics = json.load(f)
except (json.JSONDecodeError, IOError):
pass # Start fresh if file is corrupted
# Aggregate all run data from run_times directory
aggregated_metrics = aggregate_runtime_metrics(evaluation.config.output_dir)
if aggregated_metrics:
runtime = aggregated_metrics.get("runtime_seconds", 0)
inference_time = aggregated_metrics.get("inference_time_seconds", 0)
scoring_time = aggregated_metrics.get("scoring_time_seconds", 0)
logger.info(
"Aggregated metrics",
runtime_seconds=runtime,
inference_time_seconds=inference_time,
scoring_time_seconds=scoring_time,
peak_memory_bytes=aggregated_metrics.get("peak_memory_bytes", 0),
total_runs=aggregated_metrics.get("total_runs", 0),
)
# Use aggregated metrics if available, otherwise use current metrics
final_metrics = aggregated_metrics if aggregated_metrics else metrics
# Merge with existing metrics, using "evaluation" as the key
# If evaluation key already exists, merge the metrics instead of overwriting
if "evaluation" in existing_metrics:
# Aggregate existing evaluation metrics with new ones
existing_eval = existing_metrics["evaluation"]
if isinstance(existing_eval, dict) and isinstance(final_metrics, dict):
# Merge dictionaries with appropriate aggregation strategy
merged_eval = existing_eval.copy()
for key, value in final_metrics.items():
if (
key in merged_eval
and isinstance(merged_eval[key], (int, float))
and isinstance(value, (int, float))
):
if key in ["runtime_seconds"]:
merged_eval[key] += value
elif key in ["peak_memory_bytes", "peak_tree_memory_bytes"]:
merged_eval[key] = max(merged_eval[key], value)
else:
merged_eval[key] += value
elif key == "end_time":
merged_eval[key] = value
elif key == "start_time":
merged_eval[key] = value
else:
merged_eval[key] = value
merged_metrics = {**existing_metrics, "evaluation": merged_eval}
else:
merged_metrics = {**existing_metrics, "evaluation": final_metrics}
else:
merged_metrics = {**existing_metrics, "evaluation": final_metrics}
# Write merged metrics to file
with open(metrics_path, "w") as f:
json.dump(merged_metrics, f, indent=2)
evaluation_result_dict = {
"git_hash": os.getenv("CORE_EVALS_GIT_HASH"),
"command": evaluation.render_command(),
"config": evaluation.config.model_dump(exclude_none=True),
"target": evaluation.target.model_dump(exclude_none=True),
"results": evaluation_result.model_dump(exclude_none=True),
**metadata_block,
}
logger.info(yaml.dump(evaluation_result_dict))
with open(os.path.join(evaluation.config.output_dir, "results.yml"), "w") as f:
yaml.dump(evaluation_result_dict, f)
return evaluation_result
[docs]
def evaluate(
eval_cfg: EvaluationConfig,
target_cfg: EvaluationTarget,
metadata: Optional[EvaluationMetadata] = None,
) -> EvaluationResult:
"""
Run an evaluation using configuration objects.
Args:
eval_cfg: Evaluation configuration object containing output directory,
parameters, and evaluation type
target_cfg: Target configuration object containing API endpoint details
and adapter configuration
Returns:
EvaluationResult: Evaluation results and metadata
"""
from nemo_evaluator.config import TelemetryLevel
from nemo_evaluator.telemetry import (
EvaluationTaskEvent,
StatusEnum,
TelemetryHandler,
get_session_id,
get_telemetry_level,
)
start_time = time.time()
telemetry_handler = None
telemetry_level = get_telemetry_level()
if telemetry_level != TelemetryLevel.OFF:
telemetry_handler = TelemetryHandler(
source_client_version=__version__,
session_id=get_session_id(),
telemetry_level=telemetry_level,
)
telemetry_handler.start()
run_config = {
"config": eval_cfg.model_dump(),
"target": target_cfg.model_dump(),
}
evaluation = validate_configuration(run_config)
# NOTE(martas): verify_capabilities returns False if any of the checks failed
# but we don't use it in any way here
verify_capabilities(evaluation)
prepare_output_directory(evaluation)
_clear_interrupted_marker(evaluation.config.output_dir)
_clear_marker(evaluation.config.output_dir, SERVER_ERROR_MARKER_FILENAME)
model_name = (
target_cfg.api_endpoint.model_id
if target_cfg.api_endpoint and target_cfg.api_endpoint.model_id
else "unknown"
)
model_name_for_telemetry = (
model_name if telemetry_level == TelemetryLevel.DEFAULT else "redacted"
)
if telemetry_handler:
telemetry_handler.enqueue(
EvaluationTaskEvent(
task=eval_cfg.type,
framework_name=evaluation.framework_name,
model=model_name_for_telemetry,
status=StatusEnum.STARTED,
)
)
status = StatusEnum.FAILURE
try:
result = _run_evaluation(evaluation, target_cfg, metadata)
status = StatusEnum.SUCCESS
return result
finally:
if telemetry_handler:
telemetry_handler.enqueue(
EvaluationTaskEvent(
task=eval_cfg.type,
framework_name=evaluation.framework_name,
model=model_name_for_telemetry,
execution_duration_seconds=time.time() - start_time,
status=status,
)
)
telemetry_handler.stop()
def _write_with_versioning_header(
out_dir: str,
filename: str,
payload: dict,
versioning: dict,
):
header = (
"# Generated by nemo-evaluator invocation with the versions of components:\n"
)
versioning_yaml = yaml.safe_dump(versioning, sort_keys=False).rstrip("\n")
header += "\n".join(f"# {line}" for line in versioning_yaml.splitlines()) + "\n"
path = os.path.join(out_dir, filename)
with open(path, "w") as f:
f.write(header)
yaml.safe_dump(payload, f, sort_keys=False)
def _persist_metadata_and_build_results_block(
out_dir: str, md: Optional[EvaluationMetadata]
) -> dict:
"""Persist the entire metadata object and return a results.yml block.
Writes the full metadata payload to `metadata.yaml` with a versioning header.
Returns "metadata.verioning" block which later will be included into results.
"""
if not md:
return {}
md_modified = copy.deepcopy(md)
# Build versioning block first so we can include it as a commented header
# in the persisted metadata file for better provenance/debuggability.
updated_versioning: dict = dict(md_modified.get("versioning", {}))
git_hash = os.getenv("CORE_EVALS_GIT_HASH")
if git_hash:
updated_versioning["git-hash"] = git_hash
# TODO(agronskiy): we cannot import top level because due to alphabetic auto-sorting in
# nemo_evaluator.__init__ this leads to circular imports. The said autosorting cannot
# yet be ignored per-import since this functionality is not supported yet in
# ruff.
from nemo_evaluator import __version__ as nemo_evaluator_version
updated_versioning["nemo_evaluator"] = nemo_evaluator_version
# Construct full metadata payload to persist and return, augmenting versioning
# with inferred fields (git-hash, nemo_evaluator_version).
md_modified["versioning"] = updated_versioning
with open(os.path.join(out_dir, "metadata.yaml"), "w") as f:
yaml.safe_dump(md_modified, f, sort_keys=False)
# For the results.yaml block we only return versioning to keep uncluttered
return {
"metadata": {
"versioning": updated_versioning,
"__skipped_fields": "see metadata.yaml for the rest of the fields",
}
}