Source code for nemo_evaluator.adapters.adapter_config

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any

from pydantic import BaseModel, Field


[docs] class DiscoveryConfig(BaseModel): """Configuration for discovering 3rd party modules and directories""" modules: list[str] = Field( description="List of module paths to discover", default_factory=list, ) dirs: list[str] = Field( description="List of directory paths to discover", default_factory=list, )
[docs] class InterceptorConfig(BaseModel): """Configuration for a single interceptor""" name: str = Field(description="Name of the interceptor to use") enabled: bool = Field( description="Whether this interceptor is enabled", default=True ) config: dict[str, Any] = Field( description="Configuration for the interceptor", default_factory=dict )
[docs] class PostEvalHookConfig(BaseModel): """Configuration for a single post-evaluation hook""" name: str = Field(description="Name of the post-evaluation hook to use") enabled: bool = Field( description="Whether this post-evaluation hook is enabled", default=True ) config: dict[str, Any] = Field( description="Configuration for the post-evaluation hook", default_factory=dict )
[docs] class Config: use_enum_values = True
[docs] class AdapterConfig(BaseModel): """Adapter configuration with registry-based interceptor support""" discovery: DiscoveryConfig = Field( description="Configuration for discovering 3rd party modules and directories", default_factory=DiscoveryConfig, ) interceptors: list[InterceptorConfig] = Field( description="List of interceptors to use with their configurations", default_factory=list, ) post_eval_hooks: list[PostEvalHookConfig] = Field( description="List of post-evaluation hooks to use with their configurations", default_factory=list, ) endpoint_type: str = Field( description="Type of the endpoint to run the adapter for", default="chat", ) log_failed_requests: bool = Field( description="Whether to log failed requests", default=False, )
[docs] @classmethod def get_legacy_defaults(cls) -> dict[str, Any]: """Get default values for legacy configuration parameters.""" return { "generate_html_report": True, "html_report_size": 5, "tracking_requests_stats": True, "caching_dir": None, "log_failed_requests": cls.model_fields["log_failed_requests"].default, "endpoint_type": cls.model_fields["endpoint_type"].default, # Boolean defaults for optional features "use_caching": True, "save_responses": False, "save_requests": False, "use_system_prompt": False, "use_omni_info": False, "use_request_logging": False, "use_nvcf": False, "use_response_logging": False, "use_reasoning": False, "process_reasoning_traces": False, "use_progress_tracking": False, "use_raise_client_errors": False, "include_json": True, "custom_system_prompt": None, "output_dir": None, "params_to_add": None, "params_to_remove": None, "params_to_rename": None, "max_logged_requests": None, "max_logged_responses": None, "max_saved_requests": None, "max_saved_responses": None, "start_reasoning_token": None, "include_if_reasoning_not_finished": None, "track_reasoning": None, "end_reasoning_token": "</think>", "progress_tracking_url": None, "progress_tracking_interval": 1, "logging_aggregated_stats_interval": 100, }
[docs] @classmethod def get_validated_config(cls, run_config: dict[str, Any]) -> "AdapterConfig": """Extract and validate adapter configuration from run_config. Args: run_config: The run configuration dictionary Returns: AdapterConfig instance if adapter_config is present in run_config, None otherwise Raises: ValueError: If adapter_config is present but invalid """ def merge_discovery( global_discovery: dict[str, Any], local_discovery: dict[str, Any] ) -> dict[str, Any]: """Merge global and local discovery configs.""" return { "modules": global_discovery.get("modules", []) + local_discovery.get("modules", []), "dirs": global_discovery.get("dirs", []) + local_discovery.get("dirs", []), } global_cfg = run_config.get("global_adapter_config", {}) local_cfg = ( run_config.get("target", {}).get("api_endpoint", {}).get("adapter_config") ) # Validate that legacy parameters are not mixed with interceptors legacy_defaults = cls.get_legacy_defaults() model_fields = set(cls.model_fields.keys()) legacy_only_params = set(legacy_defaults.keys()) - model_fields for config_name, config in [ ("global_adapter_config", global_cfg), ("target.api_endpoint.adapter_config", local_cfg), ]: if config and config.get("interceptors"): found_legacy = [p for p in legacy_only_params if p in config] if found_legacy: raise ValueError( f"Cannot use legacy configuration parameters when interceptors are explicitly defined in {config_name}. " f"Found: {', '.join(sorted(found_legacy))}. " f"Please remove these and configure using interceptors instead." ) if not global_cfg and not local_cfg: # Create default adapter config with caching enabled by default return cls.from_legacy_config({}, run_config) merged = dict(global_cfg) if global_cfg else {} if local_cfg: local_discovery = local_cfg.get("discovery") global_discovery = merged.get("discovery") if local_discovery and global_discovery: merged["discovery"] = merge_discovery(global_discovery, local_discovery) # Add/override other local fields for k, v in local_cfg.items(): if k != "discovery": merged[k] = v else: merged.update(local_cfg) # Syntactic sugar, we allow `interceptors` list in non-typed (pre-validation) # `adapter_config` to contain also plain strings, which will be treated # as `name: <this string>` if isinstance(merged.get("interceptors"), list): merged["interceptors"] = [ {"name": s} if isinstance(s, str) else s for s in merged["interceptors"] ] # Syntactic sugar for post_eval_hooks as well if isinstance(merged.get("post_eval_hooks"), list): merged["post_eval_hooks"] = [ {"name": s} if isinstance(s, str) else s for s in merged["post_eval_hooks"] ] try: config = cls(**merged) # If no interceptors are configured, try to convert from legacy format if not config.interceptors: config = cls.from_legacy_config(merged, run_config) return config except Exception as e: raise ValueError(f"Invalid adapter configuration: {e}") from e
@staticmethod def _get_default_output_dir( legacy_config: dict[str, Any], run_config: dict[str, Any] | None = None ) -> str | None: """Get default output directory based on configuration priority. Args: legacy_config: Legacy configuration dictionary run_config: Full run configuration dictionary (optional) Returns: output directory path based on priority: legacy_config.output_dir > run_config.config.output_dir > None """ # First try legacy_config, but handle KeyError if not present output_dir = legacy_config.get("output_dir") if output_dir is None and run_config: output_dir = run_config.get("config", {}).get("output_dir") return output_dir @staticmethod def _get_default_cache_dir( legacy_config: dict[str, Any], run_config: dict[str, Any] | None = None, subdir: str = "cache", ) -> str: """Get default cache directory based on configuration priority. Args: legacy_config: Legacy configuration dictionary run_config: Full run configuration dictionary (optional) subdir: Subdirectory name to append to output_dir (default: "cache") Returns: cache directory path based on priority: caching_dir > output_dir/{subdir} > /tmp/{subdir} """ # First try caching_dir from legacy config cache_dir = legacy_config["caching_dir"] if cache_dir is not None: return f"{cache_dir}/{subdir}" # Fallback to output_dir/{subdir} output_dir = AdapterConfig._get_default_output_dir(legacy_config, run_config) if output_dir: return f"{output_dir}/{subdir}" # Final fallback to /tmp/{subdir} return f"/tmp/{subdir}"
[docs] @classmethod def from_legacy_config( cls, legacy_config: dict[str, Any], run_config: dict[str, Any] | None = None ) -> "AdapterConfig": """Convert legacy configuration to new interceptor-based format. Args: legacy_config: Legacy configuration dictionary run_config: Full run configuration dictionary (optional, used to extract output_dir) Returns: AdapterConfig instance with interceptors based on legacy config """ # Merge legacy config with defaults to avoid repeated .get() calls defaults = cls.get_legacy_defaults() legacy_config = {**defaults, **legacy_config} interceptors = [] post_eval_hooks = [] # Add system message interceptor if custom system prompt is specified (Request) if ( legacy_config["use_system_prompt"] and legacy_config["custom_system_prompt"] is not None ): interceptors.append( InterceptorConfig( name="system_message", enabled=True, config={ "system_message": legacy_config["custom_system_prompt"], }, ) ) # Add payload modifier interceptor if any payload modification parameters are specified (RequestToResponse) params_to_add = legacy_config["params_to_add"] params_to_remove = legacy_config["params_to_remove"] params_to_rename = legacy_config["params_to_rename"] if params_to_add or params_to_remove or params_to_rename: config = {} if params_to_add: config["params_to_add"] = params_to_add if params_to_remove: config["params_to_remove"] = params_to_remove if params_to_rename: config["params_to_rename"] = params_to_rename interceptors.append( InterceptorConfig( name="payload_modifier", enabled=True, config=config, ) ) # Add omni info interceptor if specified (Request) if legacy_config["use_omni_info"]: interceptors.append( InterceptorConfig( name="omni_info", enabled=True, config={ "output_dir": cls._get_default_output_dir( legacy_config, run_config ), }, ) ) # Convert legacy fields to interceptors (Request) if legacy_config["use_request_logging"]: config = { "output_dir": cls._get_default_output_dir(legacy_config, run_config) } if legacy_config["max_logged_requests"] is not None: config["max_requests"] = legacy_config["max_logged_requests"] interceptors.append( InterceptorConfig( name="request_logging", config=config, ) ) # Add caching interceptor (RequestToResponse) # Activate if ANY of these are set: reuse_cached, save_responses, save_requests, generate_html_report # For caching interceptor, use caching_dir directly if provided, otherwise use output_dir/cache if legacy_config["caching_dir"] is not None: cache_dir = legacy_config["caching_dir"] else: # Use output_dir/cache if output_dir exists, otherwise /tmp/cache output_dir = AdapterConfig._get_default_output_dir( legacy_config, run_config ) if output_dir: cache_dir = f"{output_dir}/cache" else: cache_dir = "/tmp/cache" # Values are now available directly from legacy_config (merged with defaults) generate_html_report = legacy_config["generate_html_report"] max_html_report_size = legacy_config["html_report_size"] # Check if caching should be activated should_activate = any( [ legacy_config["use_caching"], legacy_config["save_responses"], legacy_config["save_requests"], generate_html_report, ] ) if should_activate: # Determine save settings based on generate_html_report if generate_html_report: save_requests = True save_responses = True if max_html_report_size is not None: # Handle None values in max() by filtering them out max_saved_requests_values = [max_html_report_size] max_saved_responses_values = [max_html_report_size] if legacy_config["max_saved_requests"] is not None: max_saved_requests_values.append( legacy_config["max_saved_requests"] ) if legacy_config["max_saved_responses"] is not None: max_saved_responses_values.append( legacy_config["max_saved_responses"] ) max_saved_requests = max(max_saved_requests_values) max_saved_responses = max(max_saved_responses_values) else: max_saved_requests = legacy_config["max_saved_requests"] max_saved_responses = legacy_config["max_saved_responses"] else: save_requests = legacy_config["save_requests"] save_responses = legacy_config["save_responses"] max_saved_requests = legacy_config["max_saved_requests"] max_saved_responses = legacy_config["max_saved_responses"] config = { "cache_dir": cache_dir, "reuse_cached_responses": legacy_config["use_caching"], "save_requests": save_requests, "save_responses": save_responses, } if max_saved_requests is not None: config["max_saved_requests"] = max_saved_requests if max_saved_responses is not None: config["max_saved_responses"] = max_saved_responses interceptors.append( InterceptorConfig( name="caching", enabled=True, config=config, ) ) # Add the final request interceptor - either nvcf or endpoint if legacy_config["use_nvcf"]: interceptors.append( InterceptorConfig( name="nvcf", enabled=True, config={}, ) ) else: # Only add endpoint if nvcf is not used interceptors.append(InterceptorConfig(name="endpoint")) # Add response stats interceptor right after endpoint if tracking is enabled # Default to True if not explicitly set in legacy config if legacy_config["tracking_requests_stats"]: # Use caching_dir if provided, otherwise use output_dir/response_stats_cache cache_dir = cls._get_default_cache_dir( legacy_config, run_config, "response_stats_cache" ) config = { "cache_dir": cache_dir, "logging_aggregated_stats_interval": legacy_config[ "logging_aggregated_stats_interval" ], } interceptors.append( InterceptorConfig( name="response_stats", enabled=True, config=config, ) ) if legacy_config["use_response_logging"]: config = { "output_dir": cls._get_default_output_dir(legacy_config, run_config) } if legacy_config["max_logged_responses"] is not None: config["max_responses"] = legacy_config["max_logged_responses"] interceptors.append( InterceptorConfig( name="response_logging", config=config, ) ) if legacy_config["use_reasoning"]: from nemo_evaluator.logging import get_logger logger = get_logger(__name__) logger.warning( '"use_reasoning" is deprecated as it might suggest it touches on switching on/off reasoning for mode when it does not. Use "process_reasoning_traces" instead.' ) # since we aim at parity between process_reasoning_traces and use_reasoning during deprecation period: legacy_config["process_reasoning_traces"] = legacy_config["use_reasoning"] if legacy_config["process_reasoning_traces"]: # give parity back legacy_config["use_reasoning"] = legacy_config["process_reasoning_traces"] config = { "end_reasoning_token": legacy_config["end_reasoning_token"], } if legacy_config["start_reasoning_token"] is not None: config["start_reasoning_token"] = legacy_config["start_reasoning_token"] if legacy_config["include_if_reasoning_not_finished"] is not None: config["include_if_not_finished"] = legacy_config[ "include_if_reasoning_not_finished" ] if legacy_config["track_reasoning"] is not None: config["enable_reasoning_tracking"] = legacy_config["track_reasoning"] # Enable caching for reasoning interceptor when tracking requests stats # Default to True if not explicitly set in legacy config if legacy_config["tracking_requests_stats"]: config["save_individuals"] = True # Use caching_dir if provided, otherwise use output_dir/reasoning_stats_cache cache_dir = cls._get_default_cache_dir( legacy_config, run_config, "reasoning_stats_cache" ) config["cache_dir"] = cache_dir # Add logging interval for aggregated stats config["logging_aggregated_stats_interval"] = legacy_config[ "logging_aggregated_stats_interval" ] interceptors.append( InterceptorConfig( name="reasoning", config=config, ) ) if legacy_config["use_progress_tracking"]: config = { "progress_tracking_interval": legacy_config[ "progress_tracking_interval" ], "request_method": "POST", # Legacy method uses POST "output_dir": cls._get_default_output_dir(legacy_config, run_config), } if legacy_config["progress_tracking_url"] is not None: config["progress_tracking_url"] = legacy_config["progress_tracking_url"] interceptors.append( InterceptorConfig( name="progress_tracking", config=config, ) ) # Add raise client errors interceptor if specified (Response) if legacy_config["use_raise_client_errors"]: # Get default values from the interceptor's Params class from nemo_evaluator.adapters.interceptors.raise_client_error_interceptor import ( RaiseClientErrorInterceptor, ) from nemo_evaluator.logging import get_logger logger = get_logger(__name__) default_params = RaiseClientErrorInterceptor.Params() interceptors.append( InterceptorConfig( name="raise_client_errors", enabled=True, config={}, ) ) logger.warning( "RaiseClientErrorInterceptor configured with default values. " f"This will raise exceptions for 4xx status codes ({default_params.status_code_range_start}-{default_params.status_code_range_end}) " f"excluding {default_params.exclude_status_codes}. " "Consider explicitly configuring the interceptor parameters for your specific use case." ) # Convert legacy HTML report generation to post-eval hook # Value is now available directly from legacy_config (merged with defaults) generate_html_report = legacy_config["generate_html_report"] if generate_html_report: report_types = ["html"] if legacy_config["include_json"]: report_types.append("json") post_eval_hooks.append( PostEvalHookConfig( name="post_eval_report", enabled=True, config={ "report_types": report_types, "html_report_size": legacy_config["html_report_size"], }, ) ) return cls( interceptors=interceptors, post_eval_hooks=post_eval_hooks, endpoint_type=legacy_config["endpoint_type"], log_failed_requests=legacy_config["log_failed_requests"], )
[docs] def get_interceptor_configs(self) -> dict[str, dict[str, Any]]: """Get interceptor configurations as a dictionary""" return {ic.name: ic.config for ic in self.interceptors if ic.enabled}
[docs] def get_post_eval_hook_configs(self) -> dict[str, dict[str, Any]]: """Get post-evaluation hook configurations as a dictionary""" return {hook.name: hook.config for hook in self.post_eval_hooks if hook.enabled}